From a9b296c522b9a12d11a4d7cdbc36279c6991ce8e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 22 Feb 2017 16:51:10 +0000 Subject: [PATCH] Get the SQL working, and make it more idempotent (#17) * Get the SQL working, and make it more idempotent --- .../roomserver/input/latest_events.go | 15 +++++-- .../storage/event_state_keys_table.go | 3 +- .../roomserver/storage/event_types_table.go | 14 ++++--- .../roomserver/storage/events_table.go | 20 +++++++-- .../roomserver/storage/rooms_table.go | 5 +-- .../dendrite/roomserver/storage/storage.go | 41 +++++++++++-------- 6 files changed, 66 insertions(+), 32 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index c37c1700..0bd18ff7 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -58,15 +58,23 @@ func doUpdateLatestEvents( } // Check if this event references any of the latest events in the room. + var alreadyInLatest bool var newLatest []types.StateAtEventAndReference for _, l := range oldLatest { + keep := true for _, prevEvent := range prevEvents { if l.EventID == prevEvent.EventID && bytes.Compare(l.EventSHA256, prevEvent.EventSHA256) == 0 { // This event can be removed from the latest events cause we've found an event that references it. // (If an event is referenced by another event then it can't be one of the latest events in the room // because we have an event that comes after it) - continue + keep = false + break } + } + if l.EventNID == stateAtEvent.EventNID { + alreadyInLatest = true + } + if keep { // Keep the event in the latest events. newLatest = append(newLatest, l) } @@ -79,8 +87,9 @@ func doUpdateLatestEvents( return err } - if !alreadyReferenced { - // This event is not referenced by any of the events in the room. + if !alreadyReferenced && !alreadyInLatest { + // This event is not referenced by any of the events in the room + // and the event is not already in the latest events. // Add it to the latest events newLatest = append(newLatest, types.StateAtEventAndReference{ StateAtEvent: stateAtEvent, diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go index ff4a76f2..88e1a560 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/event_state_keys_table.go @@ -31,8 +31,7 @@ INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES const insertEventStateKeyNIDSQL = "" + "INSERT INTO event_state_keys (event_state_key) VALUES ($1)" + " ON CONFLICT ON CONSTRAINT event_state_key_unique" + - " DO UPDATE SET event_state_key = $1" + - " RETURNING (event_state_key_nid)" + " DO NOTHING RETURNING (event_state_key_nid)" const selectEventStateKeyNIDSQL = "" + "SELECT event_state_key_nid FROM event_state_keys WHERE event_state_key = $1" diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/event_types_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/event_types_table.go index d8f132d6..8437747d 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/event_types_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/event_types_table.go @@ -50,14 +50,18 @@ INSERT INTO event_types (event_type_nid, event_type) VALUES // In that case the ID will be assigned using the next value from the sequence. // We use `RETURNING` to tell postgres to return the assigned ID. // But it's possible that the type was added in a query that raced with us. -// This will result in a conflict on the event_type_unique constraint. -// We peform a update that does nothing rather that doing nothing at all because -// postgres won't return anything unless we touch a row in the table. +// This will result in a conflict on the event_type_unique constraint, in this +// case we do nothing. Postgresql won't return a row in that case so we rely on +// the caller catching the sql.ErrNoRows error and running a select to get the row. +// We could get postgresql to return the row on a conflict by updating the row +// but it doesn't seem like a good idea to modify the rows just to make postgresql +// return it. Modifying the rows will cause postgres to assign a new tuple for the +// row even though the data doesn't change resulting in unncesssary modifications +// to the indexes. const insertEventTypeNIDSQL = "" + "INSERT INTO event_types (event_type) VALUES ($1)" + " ON CONFLICT ON CONSTRAINT event_type_unique" + - " DO UPDATE SET event_type = $1" + - " RETURNING (event_type_nid)" + " DO NOTHING RETURNING (event_type_nid)" const selectEventTypeNIDSQL = "" + "SELECT event_type_nid FROM event_types WHERE event_type = $1" diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index 4a8957dd..d89a1a0d 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -47,9 +47,12 @@ const insertEventSQL = "" + "INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids)" + " VALUES ($1, $2, $3, $4, $5, $6)" + " ON CONFLICT ON CONSTRAINT event_id_unique" + - " DO UPDATE SET event_id = $1" + + " DO NOTHING" + " RETURNING event_nid, state_snapshot_nid" +const selectEventSQL = "" + + "SELECT event_nid, state_snapshot_nid FROM events WHERE event_id = $1" + // Bulk lookup of events by string ID. // Sort by the numeric IDs for event type and state key. // This means we can use binary search to lookup entries by type and state key. @@ -71,6 +74,7 @@ const bulkSelectStateAtEventAndReferenceSQL = "" + type eventStatements struct { insertEventStmt *sql.Stmt + selectEventStmt *sql.Stmt bulkSelectStateEventByIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt updateEventStateStmt *sql.Stmt @@ -85,6 +89,9 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return } + if s.selectEventStmt, err = db.Prepare(selectEventSQL); err != nil { + return + } if s.bulkSelectStateEventByIDStmt, err = db.Prepare(bulkSelectStateEventByIDSQL); err != nil { return } @@ -119,6 +126,13 @@ func (s *eventStatements) insertEvent( return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err } +func (s *eventStatements) selectEvent(eventID string) (types.EventNID, types.StateSnapshotNID, error) { + var eventNID int64 + var stateNID int64 + err := s.selectEventStmt.QueryRow(eventID).Scan(&eventNID, &stateNID) + return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err +} + func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.StateEntry, error) { rows, err := s.bulkSelectStateEventByIDStmt.Query(pq.StringArray(eventIDs)) if err != nil { @@ -134,9 +148,9 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S for ; rows.Next(); i++ { result := &results[i] if err = rows.Scan( - &result.EventNID, &result.EventTypeNID, &result.EventStateKeyNID, + &result.EventNID, ); err != nil { return nil, err } @@ -163,9 +177,9 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types for ; rows.Next(); i++ { result := &results[i] if err = rows.Scan( - &result.EventNID, &result.EventTypeNID, &result.EventStateKeyNID, + &result.EventNID, &result.BeforeStateSnapshotNID, ); err != nil { return nil, err diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go index 07d668fa..78c1c883 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS rooms ( -- The most recent events in the room that aren't referenced by another event. -- This list may empty if the server hasn't joined the room yet. -- (The server will be in that state while it stores the events for the initial state of the room) - latest_event_nids BIGINT[] NOT NULL + latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[] ); ` @@ -24,8 +24,7 @@ CREATE TABLE IF NOT EXISTS rooms ( const insertRoomNIDSQL = "" + "INSERT INTO rooms (room_id) VALUES ($1)" + " ON CONFLICT ON CONSTRAINT room_id_unique" + - " DO UPDATE SET room_id = $1" + - " RETURNING (room_nid)" + " DO NOTHING RETURNING (room_nid)" const selectRoomNIDSQL = "" + "SELECT room_nid FROM rooms WHERE room_id = $1" diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index a49d0d84..11fc9f6c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -73,7 +73,13 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ event.EventReference().EventSHA256, authEventNIDs, ); err != nil { - return 0, types.StateAtEvent{}, err + if err == sql.ErrNoRows { + // We've already inserted the event so select the numeric event ID + eventNID, stateNID, err = d.statements.selectEvent(event.EventID()) + } + if err != nil { + return 0, types.StateAtEvent{}, err + } } if err = d.statements.insertEventJSON(eventNID, event.JSON()); err != nil { @@ -97,12 +103,13 @@ func (d *Database) assignRoomNID(roomID string) (types.RoomNID, error) { roomNID, err := d.statements.selectRoomNID(roomID) if err == sql.ErrNoRows { // We don't have a numeric ID so insert one into the database. - return d.statements.insertRoomNID(roomID) + roomNID, err = d.statements.insertRoomNID(roomID) + if err == sql.ErrNoRows { + // We raced with another insert so run the select again. + roomNID, err = d.statements.selectRoomNID(roomID) + } } - if err != nil { - return 0, err - } - return roomNID, nil + return roomNID, err } func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, error) { @@ -110,12 +117,13 @@ func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, err eventTypeNID, err := d.statements.selectEventTypeNID(eventType) if err == sql.ErrNoRows { // We don't have a numeric ID so insert one into the database. - return d.statements.insertEventTypeNID(eventType) + eventTypeNID, err = d.statements.insertEventTypeNID(eventType) + if err == sql.ErrNoRows { + // We raced with another insert so run the select again. + eventTypeNID, err = d.statements.selectEventTypeNID(eventType) + } } - if err != nil { - return 0, err - } - return eventTypeNID, nil + return eventTypeNID, err } func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) { @@ -123,12 +131,13 @@ func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyN eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey) if err == sql.ErrNoRows { // We don't have a numeric ID so insert one into the database. - return d.statements.insertEventStateKeyNID(eventStateKey) + eventStateKeyNID, err = d.statements.insertEventStateKeyNID(eventStateKey) + if err == sql.ErrNoRows { + // We raced with another insert so run the select again. + eventStateKeyNID, err = d.statements.selectEventStateKeyNID(eventStateKey) + } } - if err != nil { - return 0, err - } - return eventStateKeyNID, nil + return eventStateKeyNID, err } // StateEntriesForEventIDs implements input.EventDatabase