From 92a0325266ec90a75f4f819048f42615538b41d6 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 19 Apr 2017 16:04:01 +0100 Subject: [PATCH] Implement incremental sync (#71) This has the following known shortcomings: - It doesn't handle missing events not in the from/to range. - It doesn't order events in the timeline correctly. - It doesn't handle invited/left rooms at all. - There is no intelligent wakeup logic: events for Bob will wake up Alice's stream. --- .../matrix-org/dendrite/syncserver/README.md | 61 +++++++++ .../storage/output_room_events_table.go | 129 ++++++++++++++---- .../dendrite/syncserver/storage/syncserver.go | 37 ++++- .../dendrite/syncserver/sync/requestpool.go | 27 +--- .../dendrite/syncserver/types/types.go | 5 +- 5 files changed, 208 insertions(+), 51 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/syncserver/README.md diff --git a/src/github.com/matrix-org/dendrite/syncserver/README.md b/src/github.com/matrix-org/dendrite/syncserver/README.md new file mode 100644 index 00000000..1fc62b37 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncserver/README.md @@ -0,0 +1,61 @@ +# Sync Server + +This server is responsible for servicing `/sync` requests. It gets its data from the room server output log. + +## Internals + +When the server gets a `/sync` request, it needs to: + - Work out *which* rooms to return to the client. + - For each room, work out *which* events to return to the client. + +The logic for working out which rooms is based on [Synapse](https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821): + 1) Get the CURRENT joined room list for this user. + 2) Get membership list changes for this user between the provided stream position and now. + 3) For each room which has membership list changes: + - Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins). + If it is, then we need to send the full room state down (and 'limited' is always true). + - Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + - Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. + 4) Add joined rooms (joined room list) + +For each room, the /sync response returns the most recent timeline events and the state of the room at the start of the timeline. +The logic for working out *which* events is not based entirely on Synapse code, as it is known broken with respect to working out +room state. In order to know which events to return, the server needs to calculate room state at various points in the history of +the room. For example, imagine a room with the following 15 events (letters are state events (updated via `'`), numbers are timeline events): + +``` +index 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 (1-based indexing as StreamPosition(0) represents no event) +timeline [A, B, C, D, 1, 2, 3, D', 4, D'', 5, B', D''', D'''', 6] +``` + +The current state of this room is: `[A, B', C, D'''']`. + +If this room was requested with `?since=14&limit=5` then 1 timeline event would be returned, the most recent one: +``` + 15 + [ 6 ] +``` + +If this room was requested with `?since=9&limit=5` then 5 timeline events would be returned, the most recent ones: +``` + 11 12 13 14 15 + [5, B', D''', D'''', 6] +``` + +The state of the room at the START of the timeline can be represented in 2 ways: + - The `full_state` from index 0 : `[A, B, C, D'']` (aka the state between 0-11 exclusive) + - A partial state from index 9 : `[D'']` (aka the state between 9-11 exclusive) + +Servers advance state events (e.g from `D'` to `D''`) based on the state conflict resolution algorithm. +You might think that you could advance the current state by just updating the entry for the `(event type, state_key)` tuple +for each state event, but this state can diverge from the state calculated using the state conflict resolution algorithm. +For example, if there are two "simultaneous" updates to the same state key, that is two updates at the same depth in the +event graph, then the final result of the state conflict resolution algorithm might not match the order the events appear +in the timeline. + +The correct advancement for state events is represented by the `AddsStateEventIDs` and `RemovesStateEventIDs` that +are in `OutputRoomEvents` from the room server. + +This version of the sync server uses very simple indexing to calculate room state at various points. +This is inefficient when a very old `since` value is provided, or the `full_state` is requested, as the state delta becomes +very large. This is mitigated slightly with indexes, but better data structures could be used in the future. diff --git a/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go index 5e9eb74d..5d9fba9b 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go @@ -4,7 +4,9 @@ import ( "database/sql" "fmt" + log "github.com/Sirupsen/logrus" "github.com/lib/pq" + "github.com/matrix-org/dendrite/syncserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -40,17 +42,24 @@ const selectEventsInRangeSQL = "" + "SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2" const selectRecentEventsSQL = "" + - "SELECT event_json FROM output_room_events WHERE room_id = $1 ORDER BY id DESC LIMIT $2" + "SELECT event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4" const selectMaxIDSQL = "" + "SELECT MAX(id) FROM output_room_events" +// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). +const selectStateInRangeSQL = "" + + "SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" + + " WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + + " ORDER BY id ASC" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt selectMaxIDStmt *sql.Stmt selectEventsInRangeStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -73,9 +82,102 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } + if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { + return + } return } +// StateBetween returns the state events between the two given stream positions, exclusive of both. +// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the +// two positions, only the most recent state is returned. +func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) { + rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos) + if err != nil { + return nil, err + } + // Fetch all the state change events for all rooms between the two positions then loop each event and: + // - Keep a cache of the event by ID (99% of state change events are for the event itself) + // - For each room ID, build up an array of event IDs which represents cumulative adds/removes + // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID + // if they aren't in the event ID cache. We don't handle state deletion yet. + eventIDToEvent := make(map[string]gomatrixserverlib.Event) + + // RoomID => A set (map[string]bool) of state event IDs which are between the two positions + stateNeeded := make(map[string]map[string]bool) + + for rows.Next() { + var ( + eventBytes []byte + addIDs pq.StringArray + delIDs pq.StringArray + ) + if err := rows.Scan(&eventBytes, &addIDs, &delIDs); err != nil { + return nil, err + } + // Sanity check for deleted state and whine if we see it. We don't need to do anything + // since it'll just mark the event as not being needed. + if len(addIDs) < len(delIDs) { + log.WithFields(log.Fields{ + "since": oldPos, + "current": newPos, + "adds": addIDs, + "dels": delIDs, + }).Warn("StateBetween: ignoring deleted state") + } + + // TODO: Handle redacted events + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + needSet := stateNeeded[ev.RoomID()] + if needSet == nil { // make set if required + needSet = make(map[string]bool) + } + for _, id := range delIDs { + needSet[id] = false + } + for _, id := range addIDs { + needSet[id] = true + } + stateNeeded[ev.RoomID()] = needSet + + eventIDToEvent[ev.EventID()] = ev + } + + stateBetween, missingEvents := mapEventIDsToEvents(eventIDToEvent, stateNeeded) + + if len(missingEvents) > 0 { + return nil, fmt.Errorf("error StateBetween: TODO missing events") + } + return stateBetween, nil +} + +// convert the set of event IDs into a set of events. Mark any which are missing. +func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stateNeeded map[string]map[string]bool) (map[string][]gomatrixserverlib.Event, map[string][]string) { + stateBetween := make(map[string][]gomatrixserverlib.Event) + missingEvents := make(map[string][]string) + for roomID, ids := range stateNeeded { + events := stateBetween[roomID] + for id, need := range ids { + if !need { + continue // deleted state + } + e, ok := eventIDToEvent[id] + if ok { + events = append(events, e) + } else { + m := missingEvents[roomID] + m = append(m, id) + missingEvents[roomID] = m + } + } + stateBetween[roomID] = events + } + return stateBetween, missingEvents +} + // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. @@ -92,27 +194,6 @@ func (s *outputRoomEventsStatements) MaxID(txn *sql.Tx) (id int64, err error) { return } -// InRange returns all the events in the range between oldPos exclusive and newPos inclusive. Returns an empty array if -// there are no events between the provided range. Returns an error if events are missing in the range. -func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) { - rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos) - if err != nil { - return nil, err - } - defer rows.Close() - - result, err := rowsToEvents(rows) - if err != nil { - return nil, err - } - // Expect one event per position, exclusive of old. eg old=3, new=5, expect 4,5 so 2 events. - wantNum := int(newPos - oldPos) - if len(result) != wantNum { - return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", len(result), wantNum) - } - return result, nil -} - // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position // of the inserted event. func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) { @@ -123,8 +204,8 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser } // RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, limit int) ([]gomatrixserverlib.Event, error) { - rows, err := s.selectRecentEventsStmt.Query(roomID, limit) +func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]gomatrixserverlib.Event, error) { + rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go index fa2e6a42..61bcbe11 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go @@ -96,6 +96,36 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) return types.StreamPosition(id), nil } +// IncrementalSync returns all the data needed in order to create an incremental sync response. +func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) { + data = make(map[string]types.RoomData) + returnErr = runTransaction(d.db, func(txn *sql.Tx) error { + roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") + if err != nil { + return err + } + + state, err := d.events.StateBetween(txn, fromPos, toPos) + if err != nil { + return err + } + + for _, roomID := range roomIDs { + recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom) + if err != nil { + return err + } + roomData := types.RoomData{ + State: state[roomID], + RecentEvents: recentEvents, + } + data[roomID] = roomData + } + return nil + }) + return +} + // CompleteSync returns all the data needed in order to create a complete sync response. func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) { data = make(map[string]types.RoomData) @@ -121,7 +151,7 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom } // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, numRecentEventsPerRoom) + recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom) if err != nil { return err } @@ -135,11 +165,6 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom return } -// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos. -func (d *SyncServerDatabase) EventsInRange(oldPos, newPos types.StreamPosition) ([]gomatrixserverlib.Event, error) { - return d.events.InRange(int64(oldPos), int64(newPos)) -} - func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go index 15c58845..2fee3be6 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go @@ -139,31 +139,18 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err // TODO: handle ignored users - // TODO: Implement https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 - // 1) Get the CURRENT joined room list for this user - // 2) Get membership list changes for this user between the provided stream position and now. - // 3) For each room which has membership list changes: - // a) Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins). - // If it is, then we need to send the full room state down (and 'limited' is always true). - // b) Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. - // c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch? - // 4) Add joined rooms (joined room list) - - evs, err := rp.db.EventsInRange(req.since, currentPos) + data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) if err != nil { return nil, err } res := types.NewResponse(currentPos) - // for now, dump everything as join timeline events - for _, ev := range evs { - roomData := res.Rooms.Join[ev.RoomID()] - roomData.Timeline.Events = append(roomData.Timeline.Events, gomatrixserverlib.ToClientEvent(ev, gomatrixserverlib.FormatSync)) - res.Rooms.Join[ev.RoomID()] = roomData + for roomID, d := range data { + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } - - // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this - // as an integer even though (at the moment) it is. - res.NextBatch = currentPos.String() return res, nil } diff --git a/src/github.com/matrix-org/dendrite/syncserver/types/types.go b/src/github.com/matrix-org/dendrite/syncserver/types/types.go index 7d904c14..663976d5 100644 --- a/src/github.com/matrix-org/dendrite/syncserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncserver/types/types.go @@ -1,8 +1,9 @@ package types import ( - "github.com/matrix-org/gomatrixserverlib" "strconv" + + "github.com/matrix-org/gomatrixserverlib" ) // StreamPosition represents the offset in the sync stream a client is at. @@ -38,6 +39,8 @@ type Response struct { // NewResponse creates an empty response with initialised maps. func NewResponse(pos StreamPosition) *Response { res := Response{} + // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this + // as an integer even though (at the moment) it is. res.NextBatch = pos.String() // Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section, // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.