diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 47520ff2..5d0bdacc 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -113,8 +113,7 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) } // IncrementalSync returns all the data needed in order to create an incremental sync response. -func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") if err != nil { @@ -126,6 +125,8 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types return err } + res = types.NewResponse(toPos) + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -167,35 +168,40 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types return err } state[roomID] = removeDuplicates(state[roomID], recentEvents) - roomData := types.RoomData{ - State: state[roomID], - RecentEvents: recentEvents, - } - data[roomID] = roomData + + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } return nil }) return } -// CompleteSync returns all the data needed in order to create a complete sync response. -func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +// CompleteSync a complete /sync API response for the given user. +func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // a consistent view of the database throughout. This includes extracting the sync stream position. + // This does have the unfortunate side-effect that all the matrixy logic resides in this function, + // but it's better to not hide the fact that this is being done in a transaction. returnErr = runTransaction(d.db, func(txn *sql.Tx) error { // Get the current stream position which we will base the sync response on. id, err := d.events.MaxID(txn) if err != nil { return err } - pos = types.StreamPosition(id) + pos := types.StreamPosition(id) // Extract room state and recent events for all rooms the user is joined to. roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") if err != nil { return err } + + // Build up a /sync response + res = types.NewResponse(pos) for _, roomID := range roomIDs { stateEvents, err := d.roomstate.CurrentState(txn, roomID) if err != nil { @@ -209,12 +215,13 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom } stateEvents = removeDuplicates(stateEvents, recentEvents) - - data[roomID] = types.RoomData{ - State: stateEvents, - RecentEvents: recentEvents, - } + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } + // TODO: Add invites! return nil }) return diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index c9c41566..8e9affb6 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -102,36 +101,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) { - if req.since == types.StreamPosition(0) { - pos, data, err := rp.db.CompleteSync(req.userID, req.limit) - if err != nil { - return nil, err - } - res := types.NewResponse(pos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil - } - // TODO: handle ignored users - - data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) - if err != nil { - return nil, err + if req.since == types.StreamPosition(0) { + return rp.db.CompleteSync(req.userID, req.limit) } - - res := types.NewResponse(currentPos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil + return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index 8c112936..588737de 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -28,12 +28,6 @@ func (sp StreamPosition) String() string { return strconv.FormatInt(int64(sp), 10) } -// RoomData represents the data for a room suitable for building a sync response from. -type RoomData struct { - State []gomatrixserverlib.Event - RecentEvents []gomatrixserverlib.Event -} - // Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync type Response struct { NextBatch string `json:"next_batch"`