diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 72c306d4..67de6df7 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -245,24 +245,20 @@ func (r *messagesReq) retrieveEvents() ( // change the way topological positions are defined (as depth isn't the most // reliable way to define it), it would be easier and less troublesome to // only have to change it in one place, i.e. the database. - startPos, startStreamPos, err := r.db.EventPositionInTopology( + start, err = r.db.EventPositionInTopology( r.ctx, events[0].EventID(), ) if err != nil { err = fmt.Errorf("EventPositionInTopology: for start event %s: %w", events[0].EventID(), err) return } - endPos, endStreamPos, err := r.db.EventPositionInTopology( + end, err = r.db.EventPositionInTopology( r.ctx, events[len(events)-1].EventID(), ) if err != nil { err = fmt.Errorf("EventPositionInTopology: for end event %s: %w", events[len(events)-1].EventID(), err) return } - // Generate pagination tokens to send to the client using the positions - // retrieved previously. - start = types.NewTopologyToken(startPos, startStreamPos) - end = types.NewTopologyToken(endPos, endStreamPos) if r.backwardOrdering { // A stream/topological position is a cursor located between two events. @@ -431,14 +427,10 @@ func setToDefault( ) (to types.TopologyToken, err error) { if backwardOrdering { // go 1 earlier than the first event so we correctly fetch the earliest event + // this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound. to = types.NewTopologyToken(0, 0) } else { - var depth, stream types.StreamPosition - depth, stream, err = db.MaxTopologicalPosition(ctx, roomID) - if err != nil { - return - } - to = types.NewTopologyToken(depth, stream) + to, err = db.MaxTopologicalPosition(ctx, roomID) } return diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 63af1136..22716789 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -93,15 +93,12 @@ type Database interface { // GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit. GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // EventPositionInTopology returns the depth and stream position of the given event. - EventPositionInTopology(ctx context.Context, eventID string) (depth types.StreamPosition, stream types.StreamPosition, err error) - // EventsAtTopologicalPosition returns all of the events matching a given - // position in the topology of a given room. - EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) + EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error) // BackwardExtremitiesForRoom returns the event IDs of all of the backward // extremities we know of for a given room. BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) // MaxTopologicalPosition returns the highest topological position for a given room. - MaxTopologicalPosition(ctx context.Context, roomID string) (depth types.StreamPosition, stream types.StreamPosition, err error) + MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error) // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 48ebcf25..8a9cc49c 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -72,17 +72,12 @@ const selectMaxPositionInTopologySQL = "" + "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" + ") ORDER BY stream_position DESC LIMIT 1" -const selectEventIDsFromPositionSQL = "" + - "SELECT event_id FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1 AND topological_position = $2" - type outputRoomEventsTopologyStatements struct { insertEventInTopologyStmt *sql.Stmt selectEventIDsInRangeASCStmt *sql.Stmt selectEventIDsInRangeDESCStmt *sql.Stmt selectPositionInTopologyStmt *sql.Stmt selectMaxPositionInTopologyStmt *sql.Stmt - selectEventIDsFromPositionStmt *sql.Stmt } func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -106,9 +101,6 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { return nil, err } - if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil { - return nil, err - } return s, nil } @@ -127,7 +119,7 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( // given range in a given room's topological order. // Returns an empty slice if no events match the given range. func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( - ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool, ) (eventIDs []string, err error) { // Decide on the selection's order according to whether chronological order @@ -140,7 +132,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( } // Query the event IDs. - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit) if err == sql.ErrNoRows { // If no event matched the request, return an empty slice. return []string{}, nil @@ -176,28 +168,3 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } - -// SelectEventIDsFromPosition returns the IDs of all events that have a given -// position in the topology of a given room. -func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition( - ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition, -) (eventIDs []string, err error) { - // Query the event IDs. - rows, err := s.selectEventIDsFromPositionStmt.QueryContext(ctx, roomID, pos) - if err == sql.ErrNoRows { - // If no event matched the request, return an empty slice. - return []string{}, nil - } else if err != nil { - return - } - defer common.CloseAndLogIfError(ctx, rows, "selectEventIDsFromPosition: rows.close() failed") - // Return the IDs. - var eventID string - for rows.Next() { - if err = rows.Scan(&eventID); err != nil { - return - } - eventIDs = append(eventIDs, eventID) - } - return eventIDs, rows.Err() -} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 96e9ff61..e03a6b9f 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -319,25 +319,25 @@ func (d *Database) GetEventsInTopologicalRange( roomID string, limit int, backwardOrdering bool, ) (events []types.StreamEvent, err error) { - // Determine the backward and forward limit, i.e. the upper and lower - // limits to the selection in the room's topology, from the direction. - var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition + var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition if backwardOrdering { - // Backward ordering is antichronological (latest event to oldest - // one). - backwardLimit = to.Depth() - forwardLimit = from.Depth() - forwardMicroLimit = from.PDUPosition() + // Backward ordering means the 'from' token has a higher depth than the 'to' token + minDepth = to.Depth() + maxDepth = from.Depth() + // for cases where we have say 5 events with the same depth, the TopologyToken needs to + // know which of the 5 the client has seen. This is done by using the PDU position. + // Events with the same maxDepth but less than this PDU position will be returned. + maxStreamPosForMaxDepth = from.PDUPosition() } else { - // Forward ordering is chronological (oldest event to latest one). - backwardLimit = from.Depth() - forwardLimit = to.Depth() + // Forward ordering means the 'from' token has a lower depth than the 'to' token. + minDepth = from.Depth() + maxDepth = to.Depth() } // Select the event IDs from the defined range. var eIDs []string eIDs, err = d.Topology.SelectEventIDsInRange( - ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, + ctx, nil, roomID, minDepth, maxDepth, maxStreamPosForMaxDepth, limit, !backwardOrdering, ) if err != nil { return @@ -350,7 +350,7 @@ func (d *Database) GetEventsInTopologicalRange( func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) { err = common.WithTransaction(d.DB, func(txn *sql.Tx) error { - pos, err := d.SyncPositionTx(ctx, txn) + pos, err := d.syncPositionTx(ctx, txn) if err != nil { return err } @@ -368,29 +368,25 @@ func (d *Database) BackwardExtremitiesForRoom( func (d *Database) MaxTopologicalPosition( ctx context.Context, roomID string, -) (depth types.StreamPosition, stream types.StreamPosition, err error) { - return d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID) -} - -func (d *Database) EventsAtTopologicalPosition( - ctx context.Context, roomID string, pos types.StreamPosition, -) ([]types.StreamEvent, error) { - eIDs, err := d.Topology.SelectEventIDsFromPosition(ctx, nil, roomID, pos) +) (types.TopologyToken, error) { + depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID) if err != nil { - return nil, err + return types.NewTopologyToken(0, 0), err } - - return d.OutputEvents.SelectEvents(ctx, nil, eIDs) + return types.NewTopologyToken(depth, streamPos), nil } func (d *Database) EventPositionInTopology( ctx context.Context, eventID string, -) (depth types.StreamPosition, stream types.StreamPosition, err error) { - return d.Topology.SelectPositionInTopology(ctx, nil, eventID) +) (types.TopologyToken, error) { + depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID) + if err != nil { + return types.NewTopologyToken(0, 0), err + } + return types.NewTopologyToken(depth, stream), nil } -// TODO FIXME TEMPORARY PUBLIC -func (d *Database) SyncPositionTx( +func (d *Database) syncPositionTx( ctx context.Context, txn *sql.Tx, ) (sp types.StreamingToken, err error) { @@ -466,7 +462,7 @@ func (d *Database) addPDUDeltaToResponse( } // TODO: This should be done in getStateDeltas - if err = d.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { return nil, err } @@ -510,8 +506,7 @@ func (d *Database) addTypingDeltaToResponse( // addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if // the positions of that type are not equal in fromPos and toPos. -// TODO FIXME TEMPORARY PUBLIC -func (d *Database) AddEDUDeltaToResponse( +func (d *Database) addEDUDeltaToResponse( fromPos, toPos types.StreamingToken, joinedRoomIDs []string, res *types.Response, @@ -551,7 +546,7 @@ func (d *Database) IncrementalSync( return nil, err } - err = d.AddEDUDeltaToResponse( + err = d.addEDUDeltaToResponse( fromPos, toPos, joinedRoomIDs, res, ) if err != nil { @@ -590,7 +585,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( }() // Get the current sync position which we will base the sync response on. - toPos, err = d.SyncPositionTx(ctx, txn) + toPos, err = d.syncPositionTx(ctx, txn) if err != nil { return } @@ -649,7 +644,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( res.Rooms.Join[roomID] = *jr } - if err = d.AddInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { return } @@ -668,7 +663,7 @@ func (d *Database) CompleteSync( } // Use a zero value SyncPosition for fromPos so all EDU states are added. - err = d.AddEDUDeltaToResponse( + err = d.addEDUDeltaToResponse( types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res, ) if err != nil { @@ -688,8 +683,7 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -// TODO FIXME temporary public -func (d *Database) AddInvitesToResponse( +func (d *Database) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, fromPos, toPos types.StreamPosition, diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index d3e88a54..c1647a14 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -306,9 +306,6 @@ func (s *outputRoomEventsStatements) InsertEvent( return } -// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'. -// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude -// from sync. func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, @@ -341,8 +338,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( return events, nil } -// selectEarlyEvents returns the earliest events in the given room, starting -// from a given position, up to a maximum of 'limit'. func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index 4469f5b7..f25ac623 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -65,17 +65,12 @@ const selectMaxPositionInTopologySQL = "" + "SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" + " WHERE room_id = $1 ORDER BY stream_position DESC" -const selectEventIDsFromPositionSQL = "" + - "SELECT event_id FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1 AND topological_position = $2" - type outputRoomEventsTopologyStatements struct { insertEventInTopologyStmt *sql.Stmt selectEventIDsInRangeASCStmt *sql.Stmt selectEventIDsInRangeDESCStmt *sql.Stmt selectPositionInTopologyStmt *sql.Stmt selectMaxPositionInTopologyStmt *sql.Stmt - selectEventIDsFromPositionStmt *sql.Stmt } func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -99,9 +94,6 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { return nil, err } - if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil { - return nil, err - } return s, nil } @@ -117,12 +109,9 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( return } -// selectEventIDsInRange selects the IDs of events which positions are within a -// given range in a given room's topological order. -// Returns an empty slice if no events match the given range. func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( ctx context.Context, txn *sql.Tx, roomID string, - fromPos, toPos, toMicroPos types.StreamPosition, + minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool, ) (eventIDs []string, err error) { // Decide on the selection's order according to whether chronological order @@ -135,7 +124,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( } // Query the event IDs. - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit) if err == sql.ErrNoRows { // If no event matched the request, return an empty slice. return []string{}, nil @@ -172,28 +161,3 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } - -// selectEventIDsFromPosition returns the IDs of all events that have a given -// position in the topology of a given room. -func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition( - ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition, -) (eventIDs []string, err error) { - // Query the event IDs. - stmt := common.TxStmt(txn, s.selectEventIDsFromPositionStmt) - rows, err := stmt.QueryContext(ctx, roomID, pos) - if err == sql.ErrNoRows { - // If no event matched the request, return an empty slice. - return []string{}, nil - } else if err != nil { - return - } - // Return the IDs. - var eventID string - for rows.Next() { - if err = rows.Scan(&eventID); err != nil { - return - } - eventIDs = append(eventIDs, eventID) - } - return -} diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index bffcfd05..69ac5e6f 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -292,11 +292,10 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { db := MustCreateDatabase(t) events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB) MustWriteEvents(t, db, events) - latest, latestStream, err := db.MaxTopologicalPosition(ctx, testRoomID) + from, err := db.MaxTopologicalPosition(ctx, testRoomID) if err != nil { t.Fatalf("failed to get MaxTopologicalPosition: %s", err) } - from := types.NewTopologyToken(latest, latestStream) // head towards the beginning of time to := types.NewTopologyToken(0, 0) @@ -358,16 +357,14 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { Depth: depth + 1, })) MustWriteEvents(t, db, events) - latestPos, latestStreamPos, err := db.EventPositionInTopology(ctx, events[len(events)-1].EventID()) + fromLatest, err := db.EventPositionInTopology(ctx, events[len(events)-1].EventID()) if err != nil { t.Fatalf("failed to get EventPositionInTopology: %s", err) } - topoPos, streamPos, err := db.EventPositionInTopology(ctx, events[len(events)-3].EventID()) // Message 2 + fromFork, err := db.EventPositionInTopology(ctx, events[len(events)-3].EventID()) // Message 2 if err != nil { t.Fatalf("failed to get EventPositionInTopology for event: %s", err) } - fromLatest := types.NewTopologyToken(latestPos, latestStreamPos) - fromFork := types.NewTopologyToken(topoPos, streamPos) // head towards the beginning of time to := types.NewTopologyToken(0, 0) @@ -507,12 +504,10 @@ func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatr } func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *types.TopologyToken { - pos, spos, err := db.EventPositionInTopology(ctx, eventID) + tok, err := db.EventPositionInTopology(ctx, eventID) if err != nil { t.Fatalf("failed to get EventPositionInTopology: %s", err) } - - tok := types.NewTopologyToken(pos, spos) tok.Decrement() return &tok } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 8f0b8b89..1e5351b5 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -11,13 +11,15 @@ import ( type AccountData interface { InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error) - SelectAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) + // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. The range is exclusive of `lowPos` and inclusive of `hiPos`. + SelectAccountDataInRange(ctx context.Context, userID string, lowPos, hiPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error) } type Invites interface { InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error) DeleteInviteEvent(ctx context.Context, inviteEventID string) error + // SelectInviteEventsInRange returns a map of room ID to invite events. The range is exclusive of `startPos` and inclusive of `endPos`. SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition) (map[string]gomatrixserverlib.HeaderedEvent, error) SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error) } @@ -26,26 +28,30 @@ type Events interface { SelectStateInRange(ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error) SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error) InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) + // SelectRecentEvents returns events between the two stream positions: exclusive of `fromPos` and inclusive of `toPos`. + // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. + // Returns up to `limit` events. SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error) + // SelectEarlyEvents returns the earliest events in the given room, exclusive of `fromPos` and inclusive of `toPos`. SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) } +// Topology keeps track of the depths and stream positions for all events. +// These positions are used as types.TopologyToken when backfilling events locally. type Topology interface { - // InsertEventInTopology inserts the given event in the room's topology, based - // on the event's depth. + // InsertEventInTopology inserts the given event in the room's topology, based on the event's depth. + // `pos` is the stream position of this event in the events table, and is used to order events which have the same depth. InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error) - // SelectEventIDsInRange selects the IDs of events which positions are within a - // given range in a given room's topological order. + // SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order. + // Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`. + // `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned. // Returns an empty slice if no events match the given range. - SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error) - // SelectPositionInTopology returns the position of a given event in the - // topology of the room it belongs to. - SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos, spos types.StreamPosition, err error) - SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (pos types.StreamPosition, spos types.StreamPosition, err error) - // SelectEventIDsFromPosition returns the IDs of all events that have a given - // position in the topology of a given room. - SelectEventIDsFromPosition(ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition) (eventIDs []string, err error) + SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error) + // SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to. + SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error) + // SelectMaxPositionInTopology returns the event which has the highest depth, and if there are multiple, the event with the highest stream position. + SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error) } type CurrentRoomState interface { diff --git a/syncapi/types/types.go b/syncapi/types/types.go index c1b6d7dd..8a79ccd4 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -119,8 +119,11 @@ func (t *TopologyToken) Decrement() { depth := t.Positions[0] pduPos := t.Positions[1] if depth-1 <= 0 { + // nothing can be lower than this depth = 1 } else { + // this assumes that we will never have 1000 events all with the same + // depth. TODO: work out what the right PDU position is to use, probably needs a db hit. depth-- pduPos += 1000 }