Implement incremental sync (#71)
This has the following known shortcomings: - It doesn't handle missing events not in the from/to range. - It doesn't order events in the timeline correctly. - It doesn't handle invited/left rooms at all. - There is no intelligent wakeup logic: events for Bob will wake up Alice's stream.main
parent
f18d935134
commit
92a0325266
|
@ -0,0 +1,61 @@
|
|||
# Sync Server
|
||||
|
||||
This server is responsible for servicing `/sync` requests. It gets its data from the room server output log.
|
||||
|
||||
## Internals
|
||||
|
||||
When the server gets a `/sync` request, it needs to:
|
||||
- Work out *which* rooms to return to the client.
|
||||
- For each room, work out *which* events to return to the client.
|
||||
|
||||
The logic for working out which rooms is based on [Synapse](https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821):
|
||||
1) Get the CURRENT joined room list for this user.
|
||||
2) Get membership list changes for this user between the provided stream position and now.
|
||||
3) For each room which has membership list changes:
|
||||
- Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins).
|
||||
If it is, then we need to send the full room state down (and 'limited' is always true).
|
||||
- Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||
- Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block.
|
||||
4) Add joined rooms (joined room list)
|
||||
|
||||
For each room, the /sync response returns the most recent timeline events and the state of the room at the start of the timeline.
|
||||
The logic for working out *which* events is not based entirely on Synapse code, as it is known broken with respect to working out
|
||||
room state. In order to know which events to return, the server needs to calculate room state at various points in the history of
|
||||
the room. For example, imagine a room with the following 15 events (letters are state events (updated via `'`), numbers are timeline events):
|
||||
|
||||
```
|
||||
index 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 (1-based indexing as StreamPosition(0) represents no event)
|
||||
timeline [A, B, C, D, 1, 2, 3, D', 4, D'', 5, B', D''', D'''', 6]
|
||||
```
|
||||
|
||||
The current state of this room is: `[A, B', C, D'''']`.
|
||||
|
||||
If this room was requested with `?since=14&limit=5` then 1 timeline event would be returned, the most recent one:
|
||||
```
|
||||
15
|
||||
[ 6 ]
|
||||
```
|
||||
|
||||
If this room was requested with `?since=9&limit=5` then 5 timeline events would be returned, the most recent ones:
|
||||
```
|
||||
11 12 13 14 15
|
||||
[5, B', D''', D'''', 6]
|
||||
```
|
||||
|
||||
The state of the room at the START of the timeline can be represented in 2 ways:
|
||||
- The `full_state` from index 0 : `[A, B, C, D'']` (aka the state between 0-11 exclusive)
|
||||
- A partial state from index 9 : `[D'']` (aka the state between 9-11 exclusive)
|
||||
|
||||
Servers advance state events (e.g from `D'` to `D''`) based on the state conflict resolution algorithm.
|
||||
You might think that you could advance the current state by just updating the entry for the `(event type, state_key)` tuple
|
||||
for each state event, but this state can diverge from the state calculated using the state conflict resolution algorithm.
|
||||
For example, if there are two "simultaneous" updates to the same state key, that is two updates at the same depth in the
|
||||
event graph, then the final result of the state conflict resolution algorithm might not match the order the events appear
|
||||
in the timeline.
|
||||
|
||||
The correct advancement for state events is represented by the `AddsStateEventIDs` and `RemovesStateEventIDs` that
|
||||
are in `OutputRoomEvents` from the room server.
|
||||
|
||||
This version of the sync server uses very simple indexing to calculate room state at various points.
|
||||
This is inefficient when a very old `since` value is provided, or the `full_state` is requested, as the state delta becomes
|
||||
very large. This is mitigated slightly with indexes, but better data structures could be used in the future.
|
|
@ -4,7 +4,9 @@ import (
|
|||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/syncserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
@ -40,17 +42,24 @@ const selectEventsInRangeSQL = "" +
|
|||
"SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2"
|
||||
|
||||
const selectRecentEventsSQL = "" +
|
||||
"SELECT event_json FROM output_room_events WHERE room_id = $1 ORDER BY id DESC LIMIT $2"
|
||||
"SELECT event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4"
|
||||
|
||||
const selectMaxIDSQL = "" +
|
||||
"SELECT MAX(id) FROM output_room_events"
|
||||
|
||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" +
|
||||
" WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
" ORDER BY id ASC"
|
||||
|
||||
type outputRoomEventsStatements struct {
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
selectMaxIDStmt *sql.Stmt
|
||||
selectEventsInRangeStmt *sql.Stmt
|
||||
selectRecentEventsStmt *sql.Stmt
|
||||
selectStateInRangeStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||
|
@ -73,8 +82,101 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StateBetween returns the state events between the two given stream positions, exclusive of both.
|
||||
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
||||
// two positions, only the most recent state is returned.
|
||||
func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) {
|
||||
rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Fetch all the state change events for all rooms between the two positions then loop each event and:
|
||||
// - Keep a cache of the event by ID (99% of state change events are for the event itself)
|
||||
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes
|
||||
// For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID
|
||||
// if they aren't in the event ID cache. We don't handle state deletion yet.
|
||||
eventIDToEvent := make(map[string]gomatrixserverlib.Event)
|
||||
|
||||
// RoomID => A set (map[string]bool) of state event IDs which are between the two positions
|
||||
stateNeeded := make(map[string]map[string]bool)
|
||||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
addIDs pq.StringArray
|
||||
delIDs pq.StringArray
|
||||
)
|
||||
if err := rows.Scan(&eventBytes, &addIDs, &delIDs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Sanity check for deleted state and whine if we see it. We don't need to do anything
|
||||
// since it'll just mark the event as not being needed.
|
||||
if len(addIDs) < len(delIDs) {
|
||||
log.WithFields(log.Fields{
|
||||
"since": oldPos,
|
||||
"current": newPos,
|
||||
"adds": addIDs,
|
||||
"dels": delIDs,
|
||||
}).Warn("StateBetween: ignoring deleted state")
|
||||
}
|
||||
|
||||
// TODO: Handle redacted events
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
needSet := stateNeeded[ev.RoomID()]
|
||||
if needSet == nil { // make set if required
|
||||
needSet = make(map[string]bool)
|
||||
}
|
||||
for _, id := range delIDs {
|
||||
needSet[id] = false
|
||||
}
|
||||
for _, id := range addIDs {
|
||||
needSet[id] = true
|
||||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
|
||||
eventIDToEvent[ev.EventID()] = ev
|
||||
}
|
||||
|
||||
stateBetween, missingEvents := mapEventIDsToEvents(eventIDToEvent, stateNeeded)
|
||||
|
||||
if len(missingEvents) > 0 {
|
||||
return nil, fmt.Errorf("error StateBetween: TODO missing events")
|
||||
}
|
||||
return stateBetween, nil
|
||||
}
|
||||
|
||||
// convert the set of event IDs into a set of events. Mark any which are missing.
|
||||
func mapEventIDsToEvents(eventIDToEvent map[string]gomatrixserverlib.Event, stateNeeded map[string]map[string]bool) (map[string][]gomatrixserverlib.Event, map[string][]string) {
|
||||
stateBetween := make(map[string][]gomatrixserverlib.Event)
|
||||
missingEvents := make(map[string][]string)
|
||||
for roomID, ids := range stateNeeded {
|
||||
events := stateBetween[roomID]
|
||||
for id, need := range ids {
|
||||
if !need {
|
||||
continue // deleted state
|
||||
}
|
||||
e, ok := eventIDToEvent[id]
|
||||
if ok {
|
||||
events = append(events, e)
|
||||
} else {
|
||||
m := missingEvents[roomID]
|
||||
m = append(m, id)
|
||||
missingEvents[roomID] = m
|
||||
}
|
||||
}
|
||||
stateBetween[roomID] = events
|
||||
}
|
||||
return stateBetween, missingEvents
|
||||
}
|
||||
|
||||
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
|
||||
// then this function should only ever be used at startup, as it will race with inserting events if it is
|
||||
|
@ -92,27 +194,6 @@ func (s *outputRoomEventsStatements) MaxID(txn *sql.Tx) (id int64, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// InRange returns all the events in the range between oldPos exclusive and newPos inclusive. Returns an empty array if
|
||||
// there are no events between the provided range. Returns an error if events are missing in the range.
|
||||
func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) {
|
||||
rows, err := s.selectEventsInRangeStmt.Query(oldPos, newPos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result, err := rowsToEvents(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Expect one event per position, exclusive of old. eg old=3, new=5, expect 4,5 so 2 events.
|
||||
wantNum := int(newPos - oldPos)
|
||||
if len(result) != wantNum {
|
||||
return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", len(result), wantNum)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
|
||||
// of the inserted event.
|
||||
func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) (streamPos int64, err error) {
|
||||
|
@ -123,8 +204,8 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser
|
|||
}
|
||||
|
||||
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
|
||||
func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, limit int) ([]gomatrixserverlib.Event, error) {
|
||||
rows, err := s.selectRecentEventsStmt.Query(roomID, limit)
|
||||
func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]gomatrixserverlib.Event, error) {
|
||||
rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -96,6 +96,36 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
|
|||
return types.StreamPosition(id), nil
|
||||
}
|
||||
|
||||
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
||||
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) {
|
||||
data = make(map[string]types.RoomData)
|
||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
state, err := d.events.StateBetween(txn, fromPos, toPos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, roomID := range roomIDs {
|
||||
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roomData := types.RoomData{
|
||||
State: state[roomID],
|
||||
RecentEvents: recentEvents,
|
||||
}
|
||||
data[roomID] = roomData
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// CompleteSync returns all the data needed in order to create a complete sync response.
|
||||
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
|
||||
data = make(map[string]types.RoomData)
|
||||
|
@ -121,7 +151,7 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
|
|||
}
|
||||
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
||||
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
||||
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, numRecentEventsPerRoom)
|
||||
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -135,11 +165,6 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
|
|||
return
|
||||
}
|
||||
|
||||
// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos.
|
||||
func (d *SyncServerDatabase) EventsInRange(oldPos, newPos types.StreamPosition) ([]gomatrixserverlib.Event, error) {
|
||||
return d.events.InRange(int64(oldPos), int64(newPos))
|
||||
}
|
||||
|
||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||
txn, err := db.Begin()
|
||||
if err != nil {
|
||||
|
|
|
@ -139,31 +139,18 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err
|
|||
|
||||
// TODO: handle ignored users
|
||||
|
||||
// TODO: Implement https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||
// 1) Get the CURRENT joined room list for this user
|
||||
// 2) Get membership list changes for this user between the provided stream position and now.
|
||||
// 3) For each room which has membership list changes:
|
||||
// a) Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins).
|
||||
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
||||
// b) Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||
// c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch?
|
||||
// 4) Add joined rooms (joined room list)
|
||||
|
||||
evs, err := rp.db.EventsInRange(req.since, currentPos)
|
||||
data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := types.NewResponse(currentPos)
|
||||
// for now, dump everything as join timeline events
|
||||
for _, ev := range evs {
|
||||
roomData := res.Rooms.Join[ev.RoomID()]
|
||||
roomData.Timeline.Events = append(roomData.Timeline.Events, gomatrixserverlib.ToClientEvent(ev, gomatrixserverlib.FormatSync))
|
||||
res.Rooms.Join[ev.RoomID()] = roomData
|
||||
for roomID, d := range data {
|
||||
jr := types.NewJoinResponse()
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
|
||||
res.Rooms.Join[roomID] = *jr
|
||||
}
|
||||
|
||||
// Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
|
||||
// as an integer even though (at the moment) it is.
|
||||
res.NextBatch = currentPos.String()
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"strconv"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// StreamPosition represents the offset in the sync stream a client is at.
|
||||
|
@ -38,6 +39,8 @@ type Response struct {
|
|||
// NewResponse creates an empty response with initialised maps.
|
||||
func NewResponse(pos StreamPosition) *Response {
|
||||
res := Response{}
|
||||
// Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
|
||||
// as an integer even though (at the moment) it is.
|
||||
res.NextBatch = pos.String()
|
||||
// Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section,
|
||||
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
|
||||
|
|
Loading…
Reference in New Issue