diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 4a50b9a0..793d1e23 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -40,7 +40,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync const insertEventInTopologySQL = "" + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" + " VALUES ($1, $2, $3)" + - " ON CONFLICT DO NOTHING" + " ON CONFLICT (topological_position, room_id) DO UPDATE SET event_id = $1" const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 621aec95..f391c578 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -602,7 +602,6 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( recentStreamEvents, err = d.events.selectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition, numRecentEventsPerRoom, true, true, - //ctx, txn, roomID, 0, toPos.PDUPosition, numRecentEventsPerRoom, ) if err != nil { return @@ -618,7 +617,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( if backwardTopologyPos-1 <= 0 { backwardTopologyPos = types.StreamPosition(1) } else { - backwardTopologyPos = backwardTopologyPos - 1 + backwardTopologyPos-- } // We don't include a device here as we don't need to send down @@ -767,12 +766,9 @@ func (d *SyncServerDatasource) addInvitesToResponse( func (d *SyncServerDatasource) getBackwardTopologyPos( ctx context.Context, events []types.StreamEvent, -) (pos types.StreamPosition, err error) { +) (pos types.StreamPosition) { if len(events) > 0 { - pos, err = d.topology.selectPositionInTopology(ctx, events[0].EventID()) - if err != nil { - return - } + pos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID()) } if pos-1 <= 0 { pos = types.StreamPosition(1) @@ -811,12 +807,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( } recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - - var backwardTopologyPos types.StreamPosition - backwardTopologyPos, err = d.getBackwardTopologyPos(ctx, recentStreamEvents) - if err != nil { - return err - } + backwardTopologyPos := d.getBackwardTopologyPos(ctx, recentStreamEvents) switch delta.membership { case gomatrixserverlib.Join: