Factor out addRoomDeltaToResponse from IncrementalSync (#306)
parent
7b0f62208f
commit
5a6a950ee8
|
@ -229,44 +229,10 @@ func (d *SyncServerDatabase) IncrementalSync(
|
||||||
|
|
||||||
res := types.NewResponse(toPos)
|
res := types.NewResponse(toPos)
|
||||||
for _, delta := range deltas {
|
for _, delta := range deltas {
|
||||||
endPos := toPos
|
err = d.addRoomDeltaToResponse(ctx, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
|
||||||
if delta.membershipPos > 0 && delta.membership == "leave" {
|
|
||||||
// make sure we don't leak recent events after the leave event.
|
|
||||||
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
|
|
||||||
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
|
|
||||||
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
|
|
||||||
// in a single /sync request
|
|
||||||
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
|
||||||
endPos = delta.membershipPos
|
|
||||||
}
|
|
||||||
var recentStreamEvents []streamEvent
|
|
||||||
recentStreamEvents, err = d.events.selectRecentEvents(
|
|
||||||
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
recentEvents := streamEventsToEvents(recentStreamEvents)
|
|
||||||
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
|
||||||
|
|
||||||
switch delta.membership {
|
|
||||||
case "join":
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
||||||
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Join[delta.roomID] = *jr
|
|
||||||
case "leave":
|
|
||||||
fallthrough // transitions to leave are the same as ban
|
|
||||||
case "ban":
|
|
||||||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
|
||||||
// no longer in the room.
|
|
||||||
lr := types.NewLeaveResponse()
|
|
||||||
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
||||||
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Leave[delta.roomID] = *lr
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This should be done in getStateDeltas
|
// TODO: This should be done in getStateDeltas
|
||||||
|
@ -418,6 +384,55 @@ func (d *SyncServerDatabase) addInvitesToResponse(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addRoomDeltaToResponse adds a room state delta to a sync response
|
||||||
|
func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
fromPos, toPos types.StreamPosition,
|
||||||
|
delta stateDelta,
|
||||||
|
numRecentEventsPerRoom int,
|
||||||
|
res *types.Response,
|
||||||
|
) error {
|
||||||
|
endPos := toPos
|
||||||
|
if delta.membershipPos > 0 && delta.membership == "leave" {
|
||||||
|
// make sure we don't leak recent events after the leave event.
|
||||||
|
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
|
||||||
|
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
|
||||||
|
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
|
||||||
|
// in a single /sync request
|
||||||
|
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
||||||
|
endPos = delta.membershipPos
|
||||||
|
}
|
||||||
|
recentStreamEvents, err := d.events.selectRecentEvents(
|
||||||
|
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
recentEvents := streamEventsToEvents(recentStreamEvents)
|
||||||
|
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
||||||
|
|
||||||
|
switch delta.membership {
|
||||||
|
case "join":
|
||||||
|
jr := types.NewJoinResponse()
|
||||||
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
|
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Join[delta.roomID] = *jr
|
||||||
|
case "leave":
|
||||||
|
fallthrough // transitions to leave are the same as ban
|
||||||
|
case "ban":
|
||||||
|
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||||
|
// no longer in the room.
|
||||||
|
lr := types.NewLeaveResponse()
|
||||||
|
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
|
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Leave[delta.roomID] = *lr
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
|
// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database.
|
||||||
// Returns a map of room ID to list of events.
|
// Returns a map of room ID to list of events.
|
||||||
func (d *SyncServerDatabase) fetchStateEvents(
|
func (d *SyncServerDatabase) fetchStateEvents(
|
||||||
|
|
Loading…
Reference in New Issue