diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index b4d7ccbd..20fa8a4e 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -35,6 +35,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +const ( + membershipJoin = "join" + membershipLeave = "leave" + membershipBan = "ban" +) + type stateDelta struct { roomID string stateEvents []gomatrixserverlib.Event @@ -248,14 +254,12 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + deltas, joinedRoomIDs, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) if err != nil { return nil, err } - joinedRoomIDs := make([]string, 0, len(deltas)) for _, delta := range deltas { - joinedRoomIDs = append(joinedRoomIDs, delta.roomID) err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err @@ -344,7 +348,7 @@ func (d *SyncServerDatasource) IncrementalSync( ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( - ctx, nil, device.UserID, "join", + ctx, nil, device.UserID, membershipJoin, ) } if err != nil { @@ -393,7 +397,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( res = types.NewResponse(toPos) // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { return } @@ -571,7 +575,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( res *types.Response, ) error { endPos := toPos - if delta.membershipPos > 0 && delta.membership == "leave" { + if delta.membershipPos > 0 && delta.membership == membershipLeave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). @@ -595,7 +599,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( } switch delta.membership { - case "join": + case membershipJoin: jr := types.NewJoinResponse() if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { // Use the short form of batch token for prev_batch @@ -608,9 +612,9 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr - case "leave": + case membershipLeave: fallthrough // transitions to leave are the same as ban - case "ban": + case membershipBan: // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() @@ -716,10 +720,14 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( return events, nil } +// getStateDeltas returns the state deltas between fromPos and toPos, +// exclusive of oldPos, inclusive of newPos, for the rooms in which +// the user has new membership events. +// A list of joined room IDs is also returned in case the caller needs it. func (d *SyncServerDatasource) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, fromPos, toPos int64, userID string, -) ([]stateDelta, error) { +) ([]stateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -733,11 +741,11 @@ func (d *SyncServerDatasource) getStateDeltas( // get all the state events ever between these two positions stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) if err != nil { - return nil, err + return nil, nil, err } state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) if err != nil { - return nil, err + return nil, nil, err } for roomID, stateStreamEvents := range state { @@ -748,12 +756,12 @@ func (d *SyncServerDatasource) getStateDeltas( // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the timeline. if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { - if membership == "join" { + if membership == membershipJoin { // send full room state down instead of a delta var allState []gomatrixserverlib.Event allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) if err != nil { - return nil, err + return nil, nil, err } s := make([]streamEvent, len(allState)) for i := 0; i < len(s); i++ { @@ -775,19 +783,19 @@ func (d *SyncServerDatasource) getStateDeltas( } // Add in currently joined rooms - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, membershipJoin) if err != nil { - return nil, err + return nil, nil, err } for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, stateDelta{ - membership: "join", + membership: membershipJoin, stateEvents: streamEventsToEvents(device, state[joinedRoomID]), roomID: joinedRoomID, }) } - return deltas, nil + return deltas, joinedRoomIDs, nil } // streamEventsToEvents converts streamEvent to Event. If device is non-nil and