From b28674435e3024bf0e4723a9cf53180607f2045e Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 1 May 2020 11:01:34 +0100 Subject: [PATCH] Correctly generate backpagination tokens for events which have the same depth (#996) * Correctly generate backpagination tokens for events which have the same depth With tests. Unfortunately the code around here is hard to understand. There will be a subsequent PR which fixes this up now that we have a test harness in place. * Add postgres impl * More linting * Fix psql statement so it actually works --- syncapi/routing/messages.go | 14 +-- syncapi/storage/interface.go | 6 +- .../postgres/output_room_events_table.go | 27 +++-- .../output_room_events_topology_table.go | 41 ++++--- syncapi/storage/postgres/syncserver.go | 18 ++- .../sqlite3/output_room_events_table.go | 29 +++-- .../output_room_events_topology_table.go | 43 ++++--- syncapi/storage/sqlite3/syncserver.go | 22 ++-- syncapi/storage/storage_test.go | 108 +++++++++++++++++- syncapi/types/types.go | 10 +- 10 files changed, 238 insertions(+), 80 deletions(-) diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 5105e224..d7e39704 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -229,14 +229,14 @@ func (r *messagesReq) retrieveEvents() ( // change the way topological positions are defined (as depth isn't the most // reliable way to define it), it would be easier and less troublesome to // only have to change it in one place, i.e. the database. - startPos, err := r.db.EventPositionInTopology( + startPos, startStreamPos, err := r.db.EventPositionInTopology( r.ctx, events[0].EventID(), ) if err != nil { err = fmt.Errorf("EventPositionInTopology: for start event %s: %w", events[0].EventID(), err) return } - endPos, err := r.db.EventPositionInTopology( + endPos, endStreamPos, err := r.db.EventPositionInTopology( r.ctx, events[len(events)-1].EventID(), ) if err != nil { @@ -246,10 +246,10 @@ func (r *messagesReq) retrieveEvents() ( // Generate pagination tokens to send to the client using the positions // retrieved previously. start = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, startPos, 0, + types.PaginationTokenTypeTopology, startPos, startStreamPos, ) end = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, endPos, 0, + types.PaginationTokenTypeTopology, endPos, endStreamPos, ) if r.backwardOrdering { @@ -407,13 +407,13 @@ func setToDefault( // go 1 earlier than the first event so we correctly fetch the earliest event to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0) } else { - var pos types.StreamPosition - pos, err = db.MaxTopologicalPosition(ctx, roomID) + var pos, stream types.StreamPosition + pos, stream, err = db.MaxTopologicalPosition(ctx, roomID) if err != nil { return } - to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos, 0) + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos, stream) } return diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index bd9504db..7d637643 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -91,8 +91,8 @@ type Database interface { // GetEventsInRange retrieves all of the events on a given ordering using the // given extremities and limit. GetEventsInRange(ctx context.Context, from, to *types.PaginationToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) - // EventPositionInTopology returns the depth of the given event. - EventPositionInTopology(ctx context.Context, eventID string) (types.StreamPosition, error) + // EventPositionInTopology returns the depth and stream position of the given event. + EventPositionInTopology(ctx context.Context, eventID string) (depth types.StreamPosition, stream types.StreamPosition, err error) // EventsAtTopologicalPosition returns all of the events matching a given // position in the topology of a given room. EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error) @@ -100,7 +100,7 @@ type Database interface { // extremities we know of for a given room. BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error) // MaxTopologicalPosition returns the highest topological position for a given room. - MaxTopologicalPosition(ctx context.Context, roomID string) (types.StreamPosition, error) + MaxTopologicalPosition(ctx context.Context, roomID string) (depth types.StreamPosition, stream types.StreamPosition, err error) // StreamEventsToEvents converts streamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 0b53dfa9..769823e9 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -94,6 +94,9 @@ const selectEarlyEventsSQL = "" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id ASC LIMIT $4" +const selectStreamPositionForEventIDSQL = "" + + "SELECT id FROM syncapi_output_room_events WHERE event_id = $1" + const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" @@ -111,13 +114,14 @@ const selectStateInRangeSQL = "" + " LIMIT $8" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectRecentEventsForSyncStmt *sql.Stmt - selectEarlyEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectRecentEventsForSyncStmt *sql.Stmt + selectEarlyEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt + selectStreamPositionForEventIDStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -146,9 +150,18 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { return } + if s.selectStreamPositionForEventIDStmt, err = db.Prepare(selectStreamPositionForEventIDSQL); err != nil { + return + } return } +func (s *outputRoomEventsStatements) selectStreamPositionForEventID(ctx context.Context, eventID string) (types.StreamPosition, error) { + var id int64 + err := s.selectStreamPositionForEventIDStmt.QueryRowContext(ctx, eventID).Scan(&id) + return types.StreamPosition(id), err +} + // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 280d4ec3..f77365c8 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -32,35 +32,44 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology ( -- The place of the event in the room's topology. This can usually be determined -- from the event's depth. topological_position BIGINT NOT NULL, + stream_position BIGINT NOT NULL, -- The 'room_id' key for the event. room_id TEXT NOT NULL ); -- The topological order will be used in events selection and ordering -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, room_id); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, stream_position, room_id); ` const insertEventInTopologySQL = "" + - "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT (topological_position, room_id) DO UPDATE SET event_id = $1" + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + - " ORDER BY topological_position ASC LIMIT $4" + " WHERE room_id = $1 AND" + + "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position = $4 AND stream_position <= $5)" + + " ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + - " ORDER BY topological_position DESC LIMIT $4" + " WHERE room_id = $1 AND" + + "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position = $4 AND stream_position <= $5)" + + " ORDER BY topological_position DESC, stream_position DESC LIMIT $6" const selectPositionInTopologySQL = "" + "SELECT topological_position FROM syncapi_output_room_events_topology" + " WHERE event_id = $1" + // Select the max topological position for the room, then sort by stream position and take the highest, + // returning both topological and stream positions. const selectMaxPositionInTopologySQL = "" + - "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + + " WHERE topological_position=(" + + "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" + + ") ORDER BY stream_position DESC LIMIT 1" const selectEventIDsFromPositionSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + @@ -104,10 +113,10 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { // insertEventInTopology inserts the given event in the room's topology, based // on the event's depth. func (s *outputRoomEventsTopologyStatements) insertEventInTopology( - ctx context.Context, event *gomatrixserverlib.HeaderedEvent, + ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ) (err error) { _, err = s.insertEventInTopologyStmt.ExecContext( - ctx, event.EventID(), event.Depth(), event.RoomID(), + ctx, event.EventID(), event.Depth(), event.RoomID(), pos, ) return } @@ -116,7 +125,7 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology( // given range in a given room's topological order. // Returns an empty slice if no events match the given range. func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( - ctx context.Context, roomID string, fromPos, toPos types.StreamPosition, + ctx context.Context, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool, ) (eventIDs []string, err error) { // Decide on the selection's order according to whether chronological order @@ -129,7 +138,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( } // Query the event IDs. - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit) if err == sql.ErrNoRows { // If no event matched the request, return an empty slice. return []string{}, nil @@ -161,8 +170,8 @@ func (s *outputRoomEventsTopologyStatements) selectPositionInTopology( func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( ctx context.Context, roomID string, -) (pos types.StreamPosition, err error) { - err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos) +) (pos types.StreamPosition, spos types.StreamPosition, err error) { + err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index ef970739..744ae7b8 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -159,7 +159,7 @@ func (d *SyncServerDatasource) WriteEvent( } pduPosition = pos - if err = d.topology.insertEventInTopology(ctx, ev); err != nil { + if err = d.topology.insertEventInTopology(ctx, ev, pos); err != nil { return err } @@ -240,12 +240,13 @@ func (d *SyncServerDatasource) GetEventsInRange( if from.Type == types.PaginationTokenTypeTopology { // Determine the backward and forward limit, i.e. the upper and lower // limits to the selection in the room's topology, from the direction. - var backwardLimit, forwardLimit types.StreamPosition + var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition if backwardOrdering { // Backward ordering is antichronological (latest event to oldest // one). backwardLimit = to.PDUPosition forwardLimit = from.PDUPosition + forwardMicroLimit = from.EDUTypingPosition } else { // Forward ordering is chronological (oldest event to latest one). backwardLimit = from.PDUPosition @@ -255,7 +256,7 @@ func (d *SyncServerDatasource) GetEventsInRange( // Select the event IDs from the defined range. var eIDs []string eIDs, err = d.topology.selectEventIDsInRange( - ctx, roomID, backwardLimit, forwardLimit, limit, !backwardOrdering, + ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, ) if err != nil { return @@ -301,7 +302,7 @@ func (d *SyncServerDatasource) BackwardExtremitiesForRoom( func (d *SyncServerDatasource) MaxTopologicalPosition( ctx context.Context, roomID string, -) (types.StreamPosition, error) { +) (depth types.StreamPosition, stream types.StreamPosition, err error) { return d.topology.selectMaxPositionInTopology(ctx, roomID) } @@ -318,8 +319,13 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition( func (d *SyncServerDatasource) EventPositionInTopology( ctx context.Context, eventID string, -) (types.StreamPosition, error) { - return d.topology.selectPositionInTopology(ctx, eventID) +) (depth types.StreamPosition, stream types.StreamPosition, err error) { + depth, err = d.topology.selectPositionInTopology(ctx, eventID) + if err != nil { + return + } + stream, err = d.events.selectStreamPositionForEventID(ctx, eventID) + return } func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 08299f64..83d7940a 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -74,6 +74,9 @@ const selectEarlyEventsSQL = "" + const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" +const selectStreamPositionForEventIDSQL = "" + + "SELECT id FROM syncapi_output_room_events WHERE event_id = $1" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). /* $1 = oldPos, @@ -99,14 +102,15 @@ const selectStateInRangeSQL = "" + " LIMIT $8" // limit type outputRoomEventsStatements struct { - streamIDStatements *streamIDStatements - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectRecentEventsForSyncStmt *sql.Stmt - selectEarlyEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + streamIDStatements *streamIDStatements + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectRecentEventsForSyncStmt *sql.Stmt + selectEarlyEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt + selectStreamPositionForEventIDStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { @@ -136,9 +140,18 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDState if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { return } + if s.selectStreamPositionForEventIDStmt, err = db.Prepare(selectStreamPositionForEventIDSQL); err != nil { + return + } return } +func (s *outputRoomEventsStatements) selectStreamPositionForEventID(ctx context.Context, eventID string) (types.StreamPosition, error) { + var id int64 + err := s.selectStreamPositionForEventIDStmt.QueryRowContext(ctx, eventID).Scan(&id) + return types.StreamPosition(id), err +} + // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index a2944c2f..d6489466 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -27,37 +27,42 @@ const outputRoomEventsTopologySchema = ` -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology ( event_id TEXT PRIMARY KEY, - topological_position BIGINT NOT NULL, + topological_position BIGINT NOT NULL, + stream_position BIGINT NOT NULL, room_id TEXT NOT NULL, - UNIQUE(topological_position, room_id) + UNIQUE(topological_position, room_id, stream_position) ); -- The topological order will be used in events selection and ordering --- CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, room_id); +-- CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, stream_position, room_id); ` const insertEventInTopologySQL = "" + - "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT (topological_position, room_id) DO UPDATE SET event_id = $1" + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" + + " VALUES ($1, $2, $3, $4)" + + " ON CONFLICT DO NOTHING" const selectEventIDsInRangeASCSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + - " ORDER BY topological_position ASC LIMIT $4" + " WHERE room_id = $1 AND" + + "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position = $4 AND stream_position <= $5)" + + " ORDER BY topological_position ASC, stream_position ASC LIMIT $6" const selectEventIDsInRangeDESCSQL = "" + - "SELECT event_id FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + - " ORDER BY topological_position DESC LIMIT $4" + "SELECT event_id FROM syncapi_output_room_events_topology" + + " WHERE room_id = $1 AND" + + "(topological_position > $2 AND topological_position < $3) OR" + + "(topological_position = $4 AND stream_position <= $5)" + + " ORDER BY topological_position DESC, stream_position DESC LIMIT $6" const selectPositionInTopologySQL = "" + "SELECT topological_position FROM syncapi_output_room_events_topology" + " WHERE event_id = $1" const selectMaxPositionInTopologySQL = "" + - "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" + - " WHERE room_id = $1" + "SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" + + " WHERE room_id = $1 ORDER BY stream_position DESC" const selectEventIDsFromPositionSQL = "" + "SELECT event_id FROM syncapi_output_room_events_topology" + @@ -101,11 +106,11 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { // insertEventInTopology inserts the given event in the room's topology, based // on the event's depth. func (s *outputRoomEventsTopologyStatements) insertEventInTopology( - ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, + ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ) (err error) { stmt := common.TxStmt(txn, s.insertEventInTopologyStmt) _, err = stmt.ExecContext( - ctx, event.EventID(), event.Depth(), event.RoomID(), + ctx, event.EventID(), event.Depth(), event.RoomID(), pos, ) return } @@ -115,7 +120,7 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology( // Returns an empty slice if no events match the given range. func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( ctx context.Context, txn *sql.Tx, roomID string, - fromPos, toPos types.StreamPosition, + fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool, ) (eventIDs []string, err error) { // Decide on the selection's order according to whether chronological order @@ -128,7 +133,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( } // Query the event IDs. - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit) if err == sql.ErrNoRows { // If no event matched the request, return an empty slice. return []string{}, nil @@ -160,9 +165,9 @@ func (s *outputRoomEventsTopologyStatements) selectPositionInTopology( func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( ctx context.Context, txn *sql.Tx, roomID string, -) (pos types.StreamPosition, err error) { +) (pos types.StreamPosition, spos types.StreamPosition, err error) { stmt := common.TxStmt(txn, s.selectMaxPositionInTopologyStmt) - err = stmt.QueryRowContext(ctx, roomID).Scan(&pos) + err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 35774830..bdf943e0 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -194,7 +194,7 @@ func (d *SyncServerDatasource) WriteEvent( } pduPosition = pos - if err = d.topology.insertEventInTopology(ctx, txn, ev); err != nil { + if err = d.topology.insertEventInTopology(ctx, txn, ev, pos); err != nil { return err } @@ -281,14 +281,16 @@ func (d *SyncServerDatasource) GetEventsInRange( // events must be retrieved from the rooms' topology table rather than the // table contaning the syncapi server's whole stream of events. if from.Type == types.PaginationTokenTypeTopology { + // TODO: ARGH CONFUSING // Determine the backward and forward limit, i.e. the upper and lower // limits to the selection in the room's topology, from the direction. - var backwardLimit, forwardLimit types.StreamPosition + var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition if backwardOrdering { // Backward ordering is antichronological (latest event to oldest // one). backwardLimit = to.PDUPosition forwardLimit = from.PDUPosition + forwardMicroLimit = from.EDUTypingPosition } else { // Forward ordering is chronological (oldest event to latest one). backwardLimit = from.PDUPosition @@ -298,7 +300,7 @@ func (d *SyncServerDatasource) GetEventsInRange( // Select the event IDs from the defined range. var eIDs []string eIDs, err = d.topology.selectEventIDsInRange( - ctx, nil, roomID, backwardLimit, forwardLimit, limit, !backwardOrdering, + ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, ) if err != nil { return @@ -328,8 +330,7 @@ func (d *SyncServerDatasource) GetEventsInRange( return } } - - return + return events, err } // SyncPosition returns the latest positions for syncing. @@ -353,7 +354,7 @@ func (d *SyncServerDatasource) BackwardExtremitiesForRoom( // room. func (d *SyncServerDatasource) MaxTopologicalPosition( ctx context.Context, roomID string, -) (types.StreamPosition, error) { +) (types.StreamPosition, types.StreamPosition, error) { return d.topology.selectMaxPositionInTopology(ctx, nil, roomID) } @@ -372,8 +373,13 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition( func (d *SyncServerDatasource) EventPositionInTopology( ctx context.Context, eventID string, -) (types.StreamPosition, error) { - return d.topology.selectPositionInTopology(ctx, nil, eventID) +) (depth types.StreamPosition, stream types.StreamPosition, err error) { + depth, err = d.topology.selectPositionInTopology(ctx, nil, eventID) + if err != nil { + return + } + stream, err = d.events.selectStreamPositionForEventID(ctx, eventID) + return } // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index e591e7ed..a57d5917 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -182,7 +182,7 @@ func TestSyncResponse(t *testing.T) { // limit set to 5 return db.CompleteSync(ctx, testUserIDA, 5) }, - // want the last 5 events, NOT the last 10. + // want the last 5 events WantTimeline: events[len(events)-5:], // want all state for the room WantState: state, @@ -248,11 +248,11 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { db := MustCreateDatabase(t) events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB) MustWriteEvents(t, db, events) - latest, err := db.MaxTopologicalPosition(ctx, testRoomID) + latest, latestStream, err := db.MaxTopologicalPosition(ctx, testRoomID) if err != nil { t.Fatalf("failed to get MaxTopologicalPosition: %s", err) } - from := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, latest, 0) + from := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, latest, latestStream) // head towards the beginning of time to := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0) @@ -265,6 +265,105 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) { assertEventsEqual(t, "", true, gots, reversed(events[len(events)-5:])) } +// The purpose of this test is to make sure that backpagination returns all events, even if some events have the same depth. +// For cases where events have the same depth, the streaming token should be used to tie break so events written via WriteEvent +// will appear FIRST when going backwards. This test creates a DAG like: +// .-----> Message ---. +// Create -> Membership --------> Message -------> Message +// `-----> Message ---` +// depth 1 2 3 4 +// +// With a total depth of 4. It tests that: +// - Backpagination over the whole fork should include all messages and not leave any out. +// - Backpagination from the middle of the fork should not return duplicates (things later than the token). +func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) { + t.Parallel() + db := MustCreateDatabase(t) + + var events []gomatrixserverlib.HeaderedEvent + events = append(events, MustCreateEvent(t, testRoomID, nil, &gomatrixserverlib.EventBuilder{ + Content: []byte(fmt.Sprintf(`{"room_version":"4","creator":"%s"}`, testUserIDA)), + Type: "m.room.create", + StateKey: &emptyStateKey, + Sender: testUserIDA, + Depth: int64(len(events) + 1), + })) + events = append(events, MustCreateEvent(t, testRoomID, []gomatrixserverlib.HeaderedEvent{events[len(events)-1]}, &gomatrixserverlib.EventBuilder{ + Content: []byte(fmt.Sprintf(`{"membership":"join"}`)), + Type: "m.room.member", + StateKey: &testUserIDA, + Sender: testUserIDA, + Depth: int64(len(events) + 1), + })) + // fork the dag into three, same prev_events and depth + parent := []gomatrixserverlib.HeaderedEvent{events[len(events)-1]} + depth := int64(len(events) + 1) + for i := 0; i < 3; i++ { + events = append(events, MustCreateEvent(t, testRoomID, parent, &gomatrixserverlib.EventBuilder{ + Content: []byte(fmt.Sprintf(`{"body":"Message A %d"}`, i+1)), + Type: "m.room.message", + Sender: testUserIDA, + Depth: depth, + })) + } + // merge the fork, prev_events are all 3 messages, depth is increased by 1. + events = append(events, MustCreateEvent(t, testRoomID, events[len(events)-3:], &gomatrixserverlib.EventBuilder{ + Content: []byte(fmt.Sprintf(`{"body":"Message merge"}`)), + Type: "m.room.message", + Sender: testUserIDA, + Depth: depth + 1, + })) + MustWriteEvents(t, db, events) + latestPos, latestStreamPos, err := db.EventPositionInTopology(ctx, events[len(events)-1].EventID()) + if err != nil { + t.Fatalf("failed to get EventPositionInTopology: %s", err) + } + topoPos, streamPos, err := db.EventPositionInTopology(ctx, events[len(events)-3].EventID()) // Message 2 + if err != nil { + t.Fatalf("failed to get EventPositionInTopology for event: %s", err) + } + fromLatest := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, latestPos, latestStreamPos) + fromFork := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, topoPos, streamPos) + // head towards the beginning of time + to := types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0) + + testCases := []struct { + Name string + From *types.PaginationToken + Limit int + Wants []gomatrixserverlib.HeaderedEvent + }{ + { + Name: "Pagination over the whole fork", + From: fromLatest, + Limit: 5, + Wants: reversed(events[len(events)-5:]), + }, + { + Name: "Paginating to the middle of the fork", + From: fromLatest, + Limit: 2, + Wants: reversed(events[len(events)-2:]), + }, + { + Name: "Pagination FROM the middle of the fork", + From: fromFork, + Limit: 3, + Wants: reversed(events[len(events)-5 : len(events)-2]), + }, + } + + for _, tc := range testCases { + // backpaginate messages starting at the latest position. + paginatedEvents, err := db.GetEventsInRange(ctx, tc.From, to, testRoomID, tc.Limit, true) + if err != nil { + t.Fatalf("%s GetEventsInRange returned an error: %s", tc.Name, err) + } + gots := gomatrixserverlib.HeaderedToClientEvents(db.StreamEventsToEvents(&testUserDeviceA, paginatedEvents), gomatrixserverlib.FormatAll) + assertEventsEqual(t, tc.Name, true, gots, tc.Wants) + } +} + func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatrixserverlib.ClientEvent, wants []gomatrixserverlib.HeaderedEvent) { if len(gots) != len(wants) { t.Fatalf("%s response returned %d events, want %d", msg, len(gots), len(wants)) @@ -294,7 +393,8 @@ func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatr t.Errorf("%s event[%d] unsigned mismatch: got %s want %s", msg, i, string(g.Unsigned), string(w.Unsigned())) } if (g.StateKey == nil && w.StateKey() != nil) || (g.StateKey != nil && w.StateKey() == nil) { - t.Fatalf("%s event[%d] state_key [not] missing: got %v want %v", msg, i, g.StateKey, w.StateKey()) + t.Errorf("%s event[%d] state_key [not] missing: got %v want %v", msg, i, g.StateKey, w.StateKey()) + continue } if g.StateKey != nil { if !w.StateKeyEquals(*g.StateKey) { diff --git a/syncapi/types/types.go b/syncapi/types/types.go index cfd49ff1..c04fe521 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -64,8 +64,14 @@ const ( // /sync or /messages, for example. type PaginationToken struct { //Position StreamPosition - Type PaginationTokenType - PDUPosition StreamPosition + Type PaginationTokenType + // For /sync, this is the PDU position. For /messages, this is the topological position (depth). + // TODO: Given how different the positions are depending on the token type, they should probably be renamed + // or use different structs altogether. + PDUPosition StreamPosition + // For /sync, this is the EDU position. For /messages, this is the stream (PDU) position. + // TODO: Given how different the positions are depending on the token type, they should probably be renamed + // or use different structs altogether. EDUTypingPosition StreamPosition }