diff --git a/src/github.com/matrix-org/dendrite/syncapi/config/config.go b/src/github.com/matrix-org/dendrite/syncapi/config/config.go index 433a7a2b..32f04722 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/syncapi/config/config.go @@ -20,6 +20,8 @@ import ( // Sync contains the config information necessary to spin up a sync-server process. type Sync struct { + // Where the room server is listening for queries. + RoomserverURL string `yaml:"roomserver_url"` // The topic for events which are written by the room server output log. RoomserverOutputTopic string `yaml:"roomserver_topic"` // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index b8ec98d2..31c48ac5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -16,6 +16,7 @@ package consumers import ( "encoding/json" + "fmt" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" @@ -33,6 +34,7 @@ type OutputRoomEvent struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase notifier *sync.Notifier + query api.RoomserverQueryAPI } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. @@ -51,6 +53,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS roomServerConsumer: &consumer, db: store, notifier: n, + query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil), } consumer.ProcessMessage = s.onMessage @@ -84,7 +87,19 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { "room_id": ev.RoomID(), }).Info("received event from roomserver") - syncStreamPos, err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs) + addsStateEvents, err := s.lookupStateEvents(output.AddsStateEventIDs, ev) + if err != nil { + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": output.AddsStateEventIDs, + "del": output.RemovesStateEventIDs, + }).Panicf("roomserver output log: state event lookup failure") + } + + syncStreamPos, err := s.db.WriteEvent( + &ev, addsStateEvents, output.AddsStateEventIDs, output.RemovesStateEventIDs, + ) if err != nil { // panic rather than continue with an inconsistent database @@ -100,3 +115,74 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { return nil } + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEvent) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + // Check if this is re-adding a state events that we previously processed + // If we have previously received a state event it may still be in + // our event database. + result, err := s.db.Events(addsStateEventIDs) + if err != nil { + return nil, err + } + missing := missingEventsFrom(result, addsStateEventIDs) + + // Check if event itself is being added. + for _, eventID := range missing { + if eventID == event.EventID() { + result = append(result, event) + break + } + } + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) == 0 { + return result, nil + } + + // At this point the missing events are neither the event itself nor are + // they present in our local database. Our only option is to fetch them + // from the roomserver using the query API. + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) != 0 { + return nil, fmt.Errorf( + "missing %d state events IDs at event %q", len(missing), event.EventID(), + ) + } + + return result, nil +} + +func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range required { + if !have[eventID] { + missing = append(missing, eventID) + } + } + return missing +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 28389ad9..d4f260e0 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -16,6 +16,7 @@ package storage import ( "database/sql" + "github.com/lib/pq" "github.com/matrix-org/gomatrixserverlib" ) @@ -35,6 +36,9 @@ CREATE TABLE IF NOT EXISTS current_room_state ( -- The 'content.membership' value if this event is an m.room.member event. For other -- events, this will be NULL. membership TEXT, + -- The serial ID of the output_room_events table when this event became + -- part of the current state of the room. + added_at BIGINT, -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key) ); @@ -45,9 +49,10 @@ CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key, ` const upsertRoomStateSQL = "" + - "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" + + "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT ON CONSTRAINT room_state_unique" + - " DO UPDATE SET event_id = $2, event_json = $5, membership = $6" + " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7" const deleteRoomStateByEventIDSQL = "" + "DELETE FROM current_room_state WHERE event_id = $1" @@ -61,12 +66,16 @@ const selectCurrentStateSQL = "" + const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" +const selectEventsWithEventIDsSQL = "" + + "SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt selectRoomIDsWithMembershipStmt *sql.Stmt selectCurrentStateStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt + selectEventsWithEventIDsStmt *sql.Stmt } func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { @@ -89,6 +98,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { return } + if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { + return + } return } @@ -141,6 +153,33 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri } defer rows.Close() + return rowsToEvents(rows) +} + +func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error { + _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) + return err +} + +func (s *currentRoomStateStatements) upsertRoomState( + txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64, +) error { + _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( + event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt, + ) + return err +} + +func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { + rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() + return rowsToStreamEvents(rows) +} + +func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { var result []gomatrixserverlib.Event for rows.Next() { var eventBytes []byte @@ -156,15 +195,3 @@ func (s *currentRoomStateStatements) selectCurrentState(txn *sql.Tx, roomID stri } return result, nil } - -func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, eventID string) error { - _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) - return err -} - -func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error { - _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( - event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, - ) - return err -} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index b3cc3925..6c8a5263 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -16,7 +16,6 @@ package storage import ( "database/sql" - "fmt" log "github.com/Sirupsen/logrus" "github.com/lib/pq" @@ -193,7 +192,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( return nil, err } defer rows.Close() - events, err := rowsToEvents(rows) + events, err := rowsToStreamEvents(rows) if err != nil { return nil, err } @@ -205,23 +204,19 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // from the database. func (s *outputRoomEventsStatements) selectEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { - rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs)) + stmt := s.selectEventsStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + rows, err := stmt.Query(pq.StringArray(eventIDs)) if err != nil { return nil, err } defer rows.Close() - result, err := rowsToEvents(rows) - if err != nil { - return nil, err - } - - if len(result) != len(eventIDs) { - return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(result), len(eventIDs)) - } - return result, nil + return rowsToStreamEvents(rows) } -func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) { +func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { var result []streamEvent for rows.Next() { var ( diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index dee0b51c..ce2e7f23 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -17,6 +17,7 @@ package storage import ( "database/sql" "encoding/json" + "fmt" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/clientapi/events" @@ -75,10 +76,24 @@ func (d *SyncServerDatabase) AllJoinedUsersInRooms() (map[string][]string, error return d.roomstate.selectJoinedUsers() } +// Events lookups a list of event by their event ID. +// Returns a list of events matching the requested IDs found in the database. +// If an event is not found in the database then it will be omitted from the list. +// Returns an error if there was a problem talking with the database +func (d *SyncServerDatabase) Events(eventIDs []string) ([]gomatrixserverlib.Event, error) { + streamEvents, err := d.events.selectEvents(nil, eventIDs) + if err != nil { + return nil, err + } + return streamEventsToEvents(streamEvents), nil +} + // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. -func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) { +func (d *SyncServerDatabase) WriteEvent( + ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, +) (streamPos types.StreamPosition, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { var err error pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) @@ -87,31 +102,19 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve } streamPos = types.StreamPosition(pos) - if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. return nil } - // Update the current room state based on the added/removed state event IDs. - // In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event. - // However, conflict resolution may result in there being different events being added, or even some removed. - if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() { - // common case - return d.updateRoomState(txn, nil, []gomatrixserverlib.Event{*ev}) - } - - // uncommon case: we need to fetch the full event for each event ID mentioned, then update room state - added, err := d.events.selectEvents(txn, addStateEventIDs) - if err != nil { - return err - } - - return d.updateRoomState(txn, removeStateEventIDs, streamEventsToEvents(added)) + return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos) }) return } -func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event) error { +func (d *SyncServerDatabase) updateRoomState( + txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event, streamPos types.StreamPosition, +) error { // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. for _, eventID := range removedEventIDs { if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil { @@ -132,7 +135,7 @@ func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []stri } membership = &memberContent.Membership } - if err := d.roomstate.upsertRoomState(txn, event, membership); err != nil { + if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil { return err } } @@ -310,7 +313,7 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma for _, missingEvIDs := range missingEvents { allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) } - evs, err := d.events.selectEvents(txn, allMissingEventIDs) + evs, err := d.fetchMissingStateEvents(txn, allMissingEventIDs) if err != nil { return nil, err } @@ -323,6 +326,45 @@ func (d *SyncServerDatabase) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet ma return stateBetween, nil } +func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { + // Fetch from the events table first so we pick up the stream ID for the + // event. + events, err := d.events.selectEvents(txn, eventIDs) + if err != nil { + return nil, err + } + + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range eventIDs { + if !have[eventID] { + missing = append(missing, eventID) + } + } + if len(missing) == 0 { + return events, nil + } + + // If they are missing from the events table then they should be state + // events that we received from outside the main event stream. + // These should be in the room state table. + stateEvents, err := d.roomstate.selectEventsWithEventIDs(txn, missing) + + if err != nil { + return nil, err + } + if len(stateEvents) != len(missing) { + return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) + } + for _, e := range stateEvents { + events = append(events, e) + } + return events, nil +} + func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response