diff --git a/syncapi/storage/postgres/backwards_extremities_table.go b/syncapi/storage/postgres/backwards_extremities_table.go new file mode 100644 index 00000000..3dd1b234 --- /dev/null +++ b/syncapi/storage/postgres/backwards_extremities_table.go @@ -0,0 +1,106 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" +) + +const backwardExtremitiesSchema = ` +-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The event ID for the last known event. This is the backwards extremity. + event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, + PRIMARY KEY(room_id, event_id, prev_event_id) +); +` + +const insertBackwardExtremitySQL = "" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT DO NOTHING" + +const selectBackwardExtremitiesForRoomSQL = "" + + "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" + +const deleteBackwardExtremitySQL = "" + + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" + +type backwardExtremitiesStatements struct { + insertBackwardExtremityStmt *sql.Stmt + selectBackwardExtremitiesForRoomStmt *sql.Stmt + deleteBackwardExtremityStmt *sql.Stmt +} + +func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) { + s := &backwardExtremitiesStatements{} + _, err := db.Exec(backwardExtremitiesSchema) + if err != nil { + return nil, err + } + if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil { + return nil, err + } + if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { + return nil, err + } + if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { + return nil, err + } + return s, nil +} + +func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, +) (err error) { + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) + return +} + +func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom( + ctx context.Context, roomID string, +) (eventIDs []string, err error) { + rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) + if err != nil { + return + } + defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") + + for rows.Next() { + var eID string + if err = rows.Scan(&eID); err != nil { + return + } + + eventIDs = append(eventIDs, eID) + } + + return eventIDs, rows.Err() +} + +func (s *backwardExtremitiesStatements) DeleteBackwardExtremity( + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, +) (err error) { + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) + return +} diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index ab8f07b2..b6ab7221 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -22,6 +22,7 @@ import ( "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -103,37 +104,38 @@ type currentRoomStateStatements struct { selectStateEventStmt *sql.Stmt } -func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(currentRoomStateSchema) +func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { + s := ¤tRoomStateStatements{} + _, err := db.Exec(currentRoomStateSchema) if err != nil { - return + return nil, err } if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil { - return + return nil, err } if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { - return + return nil, err } if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { - return + return nil, err } if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil { - return + return nil, err } if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { - return + return nil, err } if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil { - return + return nil, err } if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { - return + return nil, err } - return + return s, nil } -// JoinedMemberLists returns a map of room ID to a list of joined user IDs. -func (s *currentRoomStateStatements) selectJoinedUsers( +// SelectJoinedUsers returns a map of room ID to a list of joined user IDs. +func (s *currentRoomStateStatements) SelectJoinedUsers( ctx context.Context, ) (map[string][]string, error) { rows, err := s.selectJoinedUsersStmt.QueryContext(ctx) @@ -157,7 +159,7 @@ func (s *currentRoomStateStatements) selectJoinedUsers( } // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. -func (s *currentRoomStateStatements) selectRoomIDsWithMembership( +func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( ctx context.Context, txn *sql.Tx, userID string, @@ -181,8 +183,8 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership( return result, rows.Err() } -// CurrentState returns all the current state events for the given room. -func (s *currentRoomStateStatements) selectCurrentState( +// SelectCurrentState returns all the current state events for the given room. +func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]gomatrixserverlib.HeaderedEvent, error) { @@ -203,7 +205,7 @@ func (s *currentRoomStateStatements) selectCurrentState( return rowsToEvents(rows) } -func (s *currentRoomStateStatements) deleteRoomStateByEventID( +func (s *currentRoomStateStatements) DeleteRoomStateByEventID( ctx context.Context, txn *sql.Tx, eventID string, ) error { stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt) @@ -211,7 +213,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID( return err } -func (s *currentRoomStateStatements) upsertRoomState( +func (s *currentRoomStateStatements) UpsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition, ) error { @@ -245,7 +247,7 @@ func (s *currentRoomStateStatements) upsertRoomState( return err } -func (s *currentRoomStateStatements) selectEventsWithEventIDs( +func (s *currentRoomStateStatements) SelectEventsWithEventIDs( ctx context.Context, txn *sql.Tx, eventIDs []string, ) ([]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEventsWithEventIDsStmt) @@ -274,7 +276,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) { return result, rows.Err() } -func (s *currentRoomStateStatements) selectStateEvent( +func (s *currentRoomStateStatements) SelectStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { stmt := s.selectStateEventStmt diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 51cbd50d..48ebcf25 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -84,36 +85,37 @@ type outputRoomEventsTopologyStatements struct { selectEventIDsFromPositionStmt *sql.Stmt } -func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(outputRoomEventsTopologySchema) +func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { + s := &outputRoomEventsTopologyStatements{} + _, err := db.Exec(outputRoomEventsTopologySchema) if err != nil { - return + return nil, err } if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { - return + return nil, err } if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { - return + return nil, err } if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { - return + return nil, err } if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { - return + return nil, err } if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { - return + return nil, err } if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil { - return + return nil, err } - return + return s, nil } -// insertEventInTopology inserts the given event in the room's topology, based +// InsertEventInTopology inserts the given event in the room's topology, based // on the event's depth. -func (s *outputRoomEventsTopologyStatements) insertEventInTopology( - ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, +func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( + ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ) (err error) { _, err = s.insertEventInTopologyStmt.ExecContext( ctx, event.EventID(), event.Depth(), event.RoomID(), pos, @@ -121,11 +123,11 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology( return } -// selectEventIDsInRange selects the IDs of events which positions are within a +// SelectEventIDsInRange selects the IDs of events which positions are within a // given range in a given room's topological order. // Returns an empty slice if no events match the given range. -func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( - ctx context.Context, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, +func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( + ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool, ) (eventIDs []string, err error) { // Decide on the selection's order according to whether chronological order @@ -159,26 +161,26 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( return eventIDs, rows.Err() } -// selectPositionInTopology returns the position of a given event in the +// SelectPositionInTopology returns the position of a given event in the // topology of the room it belongs to. -func (s *outputRoomEventsTopologyStatements) selectPositionInTopology( - ctx context.Context, eventID string, +func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( + ctx context.Context, txn *sql.Tx, eventID string, ) (pos, spos types.StreamPosition, err error) { err = s.selectPositionInTopologyStmt.QueryRowContext(ctx, eventID).Scan(&pos, &spos) return } -func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( - ctx context.Context, roomID string, +func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( + ctx context.Context, txn *sql.Tx, roomID string, ) (pos types.StreamPosition, spos types.StreamPosition, err error) { err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } -// selectEventIDsFromPosition returns the IDs of all events that have a given +// SelectEventIDsFromPosition returns the IDs of all events that have a given // position in the topology of a given room. -func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition( - ctx context.Context, roomID string, pos types.StreamPosition, +func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition( + ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition, ) (eventIDs []string, err error) { // Query the event IDs. rows, err := s.selectEventIDsFromPositionStmt.QueryContext(ctx, roomID, pos) diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 4fa08ce5..d44f8b7e 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -16,47 +16,27 @@ package postgres import ( - "context" "database/sql" - "encoding/json" - "fmt" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/roomserver/api" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/storage/shared" - "github.com/matrix-org/dendrite/syncapi/storage/tables" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) -type stateDelta struct { - roomID string - stateEvents []gomatrixserverlib.HeaderedEvent - membership string - // The PDU stream position of the latest membership event for this user, if applicable. - // Can be 0 if there is no membership event in this delta. - membershipPos types.StreamPosition -} - // SyncServerDatasource represents a sync server datasource which manages // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { shared.Database db *sql.DB common.PartitionOffsetStatements - roomstate currentRoomStateStatements - topology outputRoomEventsTopologyStatements - backwardExtremities tables.BackwardsExtremities } -// NewSyncServerDatasource creates a new sync server database -func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProperties) (*SyncServerDatasource, error) { +// NewDatabase creates a new sync server database +func NewDatabase(dbDataSourceName string, dbProperties common.DbProperties) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error if d.db, err = sqlutil.Open("postgres", dbDataSourceName, dbProperties); err != nil { @@ -73,884 +53,31 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp if err != nil { return nil, err } - if err = d.roomstate.prepare(d.db); err != nil { + currState, err := NewPostgresCurrentRoomStateTable(d.db) + if err != nil { return nil, err } invites, err := NewPostgresInvitesTable(d.db) if err != nil { return nil, err } - if err = d.topology.prepare(d.db); err != nil { + topology, err := NewPostgresTopologyTable(d.db) + if err != nil { return nil, err } - d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.PostgresBackwardsExtremitiesStatements{}) + backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db) if err != nil { return nil, err } d.Database = shared.Database{ - DB: d.db, - Invites: invites, - AccountData: accountData, - OutputEvents: events, - EDUCache: cache.New(), + DB: d.db, + Invites: invites, + AccountData: accountData, + OutputEvents: events, + Topology: topology, + CurrentRoomState: currState, + BackwardExtremities: backwardExtremities, + EDUCache: cache.New(), } return &d, nil } - -func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { - return d.roomstate.selectJoinedUsers(ctx) -} - -// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of -// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table -// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. -func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { - if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { - return err - } - - // Check if we have all of the event's previous events. If an event is - // missing, add it to the room's backward extremities. - prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs()) - if err != nil { - return err - } - var found bool - for _, eID := range ev.PrevEventIDs() { - found = false - for _, prevEv := range prevEvents { - if eID == prevEv.EventID() { - found = true - } - } - - // If the event is missing, consider it a backward extremity. - if !found { - if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { - return err - } - } - } - - return nil -} - -func (d *SyncServerDatasource) WriteEvent( - ctx context.Context, - ev *gomatrixserverlib.HeaderedEvent, - addStateEvents []gomatrixserverlib.HeaderedEvent, - addStateEventIDs, removeStateEventIDs []string, - transactionID *api.TransactionID, excludeFromSync bool, -) (pduPosition types.StreamPosition, returnErr error) { - returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { - var err error - pos, err := d.Database.OutputEvents.InsertEvent( - ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, - ) - if err != nil { - return err - } - pduPosition = pos - - if err = d.topology.insertEventInTopology(ctx, ev, pos); err != nil { - return err - } - - if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { - return err - } - - if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { - // Nothing to do, the event may have just been a message event. - return nil - } - - return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) - }) - - return pduPosition, returnErr -} - -func (d *SyncServerDatasource) updateRoomState( - ctx context.Context, txn *sql.Tx, - removedEventIDs []string, - addedEvents []gomatrixserverlib.HeaderedEvent, - pduPosition types.StreamPosition, -) error { - // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. - for _, eventID := range removedEventIDs { - if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil { - return err - } - } - - for _, event := range addedEvents { - if event.StateKey() == nil { - // ignore non state events - continue - } - var membership *string - if event.Type() == "m.room.member" { - value, err := event.Membership() - if err != nil { - return err - } - membership = &value - } - if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { - return err - } - } - - return nil -} - -func (d *SyncServerDatasource) GetStateEvent( - ctx context.Context, roomID, evType, stateKey string, -) (*gomatrixserverlib.HeaderedEvent, error) { - return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) -} - -func (d *SyncServerDatasource) GetStateEventsForRoom( - ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, -) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter) - return err - }) - return -} - -func (d *SyncServerDatasource) GetEventsInTopologicalRange( - ctx context.Context, - from, to *types.TopologyToken, - roomID string, limit int, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - // Determine the backward and forward limit, i.e. the upper and lower - // limits to the selection in the room's topology, from the direction. - var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition - if backwardOrdering { - // Backward ordering is antichronological (latest event to oldest - // one). - backwardLimit = to.Depth() - forwardLimit = from.Depth() - forwardMicroLimit = from.PDUPosition() - } else { - // Forward ordering is chronological (oldest event to latest one). - backwardLimit = from.Depth() - forwardLimit = to.Depth() - } - - // Select the event IDs from the defined range. - var eIDs []string - eIDs, err = d.topology.selectEventIDsInRange( - ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, - ) - if err != nil { - return - } - - // Retrieve the events' contents using their IDs. - events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) - return -} - -func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.StreamingToken, error) { - return d.syncPositionTx(ctx, nil) -} - -func (d *SyncServerDatasource) BackwardExtremitiesForRoom( - ctx context.Context, roomID string, -) (backwardExtremities []string, err error) { - return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) -} - -func (d *SyncServerDatasource) MaxTopologicalPosition( - ctx context.Context, roomID string, -) (depth types.StreamPosition, stream types.StreamPosition, err error) { - return d.topology.selectMaxPositionInTopology(ctx, roomID) -} - -func (d *SyncServerDatasource) EventsAtTopologicalPosition( - ctx context.Context, roomID string, pos types.StreamPosition, -) ([]types.StreamEvent, error) { - eIDs, err := d.topology.selectEventIDsFromPosition(ctx, roomID, pos) - if err != nil { - return nil, err - } - - return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) -} - -func (d *SyncServerDatasource) EventPositionInTopology( - ctx context.Context, eventID string, -) (depth types.StreamPosition, stream types.StreamPosition, err error) { - return d.topology.selectPositionInTopology(ctx, eventID) -} - -func (d *SyncServerDatasource) syncPositionTx( - ctx context.Context, txn *sql.Tx, -) (sp types.StreamingToken, err error) { - - maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) - if err != nil { - return sp, err - } - maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) - if err != nil { - return sp, err - } - if maxAccountDataID > maxEventID { - maxEventID = maxAccountDataID - } - maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) - if err != nil { - return sp, err - } - if maxInviteID > maxEventID { - maxEventID = maxInviteID - } - sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition())) - return -} - -// addPDUDeltaToResponse adds all PDU deltas to a sync response. -// IDs of all rooms the user joined are returned so EDU deltas can be added for them. -func (d *SyncServerDatasource) addPDUDeltaToResponse( - ctx context.Context, - device authtypes.Device, - fromPos, toPos types.StreamPosition, - numRecentEventsPerRoom int, - wantFullState bool, - res *types.Response, -) (joinedRoomIDs []string, err error) { - txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) - if err != nil { - return nil, err - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - - stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request - - // Work out which rooms to return in the response. This is done by getting not only the currently - // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. - // This works out what the 'state' key should be for each room as well as which membership block - // to put the room into. - var deltas []stateDelta - if !wantFullState { - deltas, joinedRoomIDs, err = d.getStateDeltas( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, - ) - } else { - deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, - ) - } - if err != nil { - return nil, err - } - - for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) - if err != nil { - return nil, err - } - } - - // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { - return nil, err - } - - succeeded = true - return joinedRoomIDs, nil -} - -// addTypingDeltaToResponse adds all typing notifications to a sync response -// since the specified position. -func (d *SyncServerDatasource) addTypingDeltaToResponse( - since types.StreamingToken, - joinedRoomIDs []string, - res *types.Response, -) error { - var jr types.JoinResponse - var ok bool - var err error - for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter( - roomID, int64(since.EDUPosition()), - ); updated { - ev := gomatrixserverlib.ClientEvent{ - Type: gomatrixserverlib.MTyping, - } - ev.Content, err = json.Marshal(map[string]interface{}{ - "user_ids": typingUsers, - }) - if err != nil { - return err - } - - if jr, ok = res.Rooms.Join[roomID]; !ok { - jr = *types.NewJoinResponse() - } - jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) - res.Rooms.Join[roomID] = jr - } - } - return nil -} - -// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if -// the positions of that type are not equal in fromPos and toPos. -func (d *SyncServerDatasource) addEDUDeltaToResponse( - fromPos, toPos types.StreamingToken, - joinedRoomIDs []string, - res *types.Response, -) (err error) { - - if fromPos.EDUPosition() != toPos.EDUPosition() { - err = d.addTypingDeltaToResponse( - fromPos, joinedRoomIDs, res, - ) - } - - return -} - -func (d *SyncServerDatasource) IncrementalSync( - ctx context.Context, - device authtypes.Device, - fromPos, toPos types.StreamingToken, - numRecentEventsPerRoom int, - wantFullState bool, -) (*types.Response, error) { - nextBatchPos := fromPos.WithUpdates(toPos) - res := types.NewResponse(nextBatchPos) - - var joinedRoomIDs []string - var err error - if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { - joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res, - ) - } else { - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( - ctx, nil, device.UserID, gomatrixserverlib.Join, - ) - } - if err != nil { - return nil, err - } - - err = d.addEDUDeltaToResponse( - fromPos, toPos, joinedRoomIDs, res, - ) - if err != nil { - return nil, err - } - - return res, nil -} - -// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed -// to it. It returns toPos and joinedRoomIDs for use of adding EDUs. -func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( - ctx context.Context, - userID string, - numRecentEventsPerRoom int, -) ( - res *types.Response, - toPos types.StreamingToken, - joinedRoomIDs []string, - err error, -) { - // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have - // a consistent view of the database throughout. This includes extracting the sync position. - // This does have the unfortunate side-effect that all the matrixy logic resides in this function, - // but it's better to not hide the fact that this is being done in a transaction. - txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) - if err != nil { - return - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - - // Get the current sync position which we will base the sync response on. - toPos, err = d.syncPositionTx(ctx, txn) - if err != nil { - return - } - - res = types.NewResponse(toPos) - - // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return - } - - stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request - - // Build up a /sync response. Add joined rooms. - for _, roomID := range joinedRoomIDs { - var stateEvents []gomatrixserverlib.HeaderedEvent - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilter) - if err != nil { - return - } - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), - numRecentEventsPerRoom, true, true, - ) - if err != nil { - return - } - - // Retrieve the backward topology position, i.e. the position of the - // oldest event in the room's topology. - var prevBatchStr string - if len(recentStreamEvents) > 0 { - var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID()) - if err != nil { - return - } - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) - prevBatch.Decrement() - prevBatchStr = prevBatch.String() - } - - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs - recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = prevBatchStr - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - - if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { - return - } - - succeeded = true - return res, toPos, joinedRoomIDs, err -} - -func (d *SyncServerDatasource) CompleteSync( - ctx context.Context, userID string, numRecentEventsPerRoom int, -) (*types.Response, error) { - res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( - ctx, userID, numRecentEventsPerRoom, - ) - if err != nil { - return nil, err - } - - // Use a zero value SyncPosition for fromPos so all EDU states are added. - err = d.addEDUDeltaToResponse( - types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res, - ) - if err != nil { - return nil, err - } - - return res, nil -} - -var txReadOnlySnapshot = sql.TxOptions{ - // Set the isolation level so that we see a snapshot of the database. - // In PostgreSQL repeatable read transactions will see a snapshot taken - // at the first query, and since the transaction is read-only it can't - // run into any serialisation errors. - // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, -} - -func (d *SyncServerDatasource) addInvitesToResponse( - ctx context.Context, txn *sql.Tx, - userID string, - fromPos, toPos types.StreamPosition, - res *types.Response, -) error { - invites, err := d.Database.Invites.SelectInviteEventsInRange( - ctx, txn, userID, fromPos, toPos, - ) - if err != nil { - return err - } - for roomID, inviteEvent := range invites { - ir := types.NewInviteResponse(inviteEvent) - res.Rooms.Invite[roomID] = *ir - } - return nil -} - -// Retrieve the backward topology position, i.e. the position of the -// oldest event in the room's topology. -func (d *SyncServerDatasource) getBackwardTopologyPos( - ctx context.Context, - events []types.StreamEvent, -) (pos, spos types.StreamPosition) { - if len(events) > 0 { - pos, spos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID()) - } - if pos-1 <= 0 { - pos = types.StreamPosition(1) - } else { - pos = pos - 1 - } - return -} - -// addRoomDeltaToResponse adds a room state delta to a sync response -func (d *SyncServerDatasource) addRoomDeltaToResponse( - ctx context.Context, - device *authtypes.Device, - txn *sql.Tx, - fromPos, toPos types.StreamPosition, - delta stateDelta, - numRecentEventsPerRoom int, - res *types.Response, -) error { - endPos := toPos - if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { - // make sure we don't leak recent events after the leave event. - // TODO: History visibility makes this somewhat complex to handle correctly. For example: - // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). - // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave - // in a single /sync request - // This is all "okay" assuming history_visibility == "shared" which it is by default. - endPos = delta.membershipPos - } - recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents( - ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), - numRecentEventsPerRoom, true, true, - ) - if err != nil { - return err - } - recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) - delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents) - prevBatch := types.NewTopologyToken( - backwardTopologyPos, backwardStreamPos, - ) - - switch delta.membership { - case gomatrixserverlib.Join: - jr := types.NewJoinResponse() - - jr.Timeline.PrevBatch = prevBatch.String() - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.roomID] = *jr - case gomatrixserverlib.Leave: - fallthrough // transitions to leave are the same as ban - case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. - lr := types.NewLeaveResponse() - lr.Timeline.PrevBatch = prevBatch.String() - lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.roomID] = *lr - } - - return nil -} - -// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. -// Returns a map of room ID to list of events. -func (d *SyncServerDatasource) fetchStateEvents( - ctx context.Context, txn *sql.Tx, - roomIDToEventIDSet map[string]map[string]bool, - eventIDToEvent map[string]types.StreamEvent, -) (map[string][]types.StreamEvent, error) { - stateBetween := make(map[string][]types.StreamEvent) - missingEvents := make(map[string][]string) - for roomID, ids := range roomIDToEventIDSet { - events := stateBetween[roomID] - for id, need := range ids { - if !need { - continue // deleted state - } - e, ok := eventIDToEvent[id] - if ok { - events = append(events, e) - } else { - m := missingEvents[roomID] - m = append(m, id) - missingEvents[roomID] = m - } - } - stateBetween[roomID] = events - } - - if len(missingEvents) > 0 { - // This happens when add_state_ids has an event ID which is not in the provided range. - // We need to explicitly fetch them. - allMissingEventIDs := []string{} - for _, missingEvIDs := range missingEvents { - allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) - } - evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs) - if err != nil { - return nil, err - } - // we know we got them all otherwise an error would've been returned, so just loop the events - for _, ev := range evs { - roomID := ev.RoomID() - stateBetween[roomID] = append(stateBetween[roomID], ev) - } - } - return stateBetween, nil -} - -func (d *SyncServerDatasource) fetchMissingStateEvents( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) ([]types.StreamEvent, error) { - // Fetch from the events table first so we pick up the stream ID for the - // event. - events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs) - if err != nil { - return nil, err - } - - have := map[string]bool{} - for _, event := range events { - have[event.EventID()] = true - } - var missing []string - for _, eventID := range eventIDs { - if !have[eventID] { - missing = append(missing, eventID) - } - } - if len(missing) == 0 { - return events, nil - } - - // If they are missing from the events table then they should be state - // events that we received from outside the main event stream. - // These should be in the room state table. - stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing) - - if err != nil { - return nil, err - } - if len(stateEvents) != len(missing) { - return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) - } - events = append(events, stateEvents...) - return events, nil -} - -// getStateDeltas returns the state deltas between fromPos and toPos, -// exclusive of oldPos, inclusive of newPos, for the rooms in which -// the user has new membership events. -// A list of joined room IDs is also returned in case the caller needs it. -func (d *SyncServerDatasource) getStateDeltas( - ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, - stateFilter *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { - // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 - // - Get membership list changes for this user in this sync response - // - For each room which has membership list changes: - // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). - // If it is, then we need to send the full room state down (and 'limited' is always true). - // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. - // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. - // - Get all CURRENTLY joined rooms, and add them to 'joined' block. - var deltas []stateDelta - - // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) - if err != nil { - return nil, nil, err - } - state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) - if err != nil { - return nil, nil, err - } - - for roomID, stateStreamEvents := range state { - for _, ev := range stateStreamEvents { - // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. - // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, - // dupe join events will result in the entire room state coming down to the client again. This is added in - // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to - // the timeline. - if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { - if membership == gomatrixserverlib.Join { - // send full room state down instead of a delta - var s []types.StreamEvent - s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter) - if err != nil { - return nil, nil, err - } - state[roomID] = s - continue // we'll add this room in when we do joined rooms - } - - deltas = append(deltas, stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, - }) - break - } - } - } - - // Add in currently joined rooms - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return nil, nil, err - } - for _, joinedRoomID := range joinedRoomIDs { - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), - roomID: joinedRoomID, - }) - } - - return deltas, joinedRoomIDs, nil -} - -// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync -// requests with full_state=true. -// Fetches full state for all joined rooms and uses selectStateInRange to get -// updates for other rooms. -func (d *SyncServerDatasource) getStateDeltasForFullStateSync( - ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, - stateFilter *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return nil, nil, err - } - - // Use a reasonable initial capacity - deltas := make([]stateDelta, 0, len(joinedRoomIDs)) - - // Add full states for all joined rooms - for _, joinedRoomID := range joinedRoomIDs { - s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter) - if stateErr != nil { - return nil, nil, stateErr - } - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, s), - roomID: joinedRoomID, - }) - } - - // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) - if err != nil { - return nil, nil, err - } - state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) - if err != nil { - return nil, nil, err - } - - for roomID, stateStreamEvents := range state { - for _, ev := range stateStreamEvents { - if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { - if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. - deltas = append(deltas, stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, - }) - } - - break - } - } - } - - return deltas, joinedRoomIDs, nil -} - -func (d *SyncServerDatasource) currentStateStreamEventsForRoom( - ctx context.Context, txn *sql.Tx, roomID string, - stateFilter *gomatrixserverlib.StateFilter, -) ([]types.StreamEvent, error) { - allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter) - if err != nil { - return nil, err - } - s := make([]types.StreamEvent, len(allState)) - for i := 0; i < len(s); i++ { - s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0} - } - return s, nil -} - -// There may be some overlap where events in stateEvents are already in recentEvents, so filter -// them out so we don't include them twice in the /sync response. They should be in recentEvents -// only, so clients get to the correct state once they have rolled forward. -func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent { - for _, recentEv := range recentEvents { - if recentEv.StateKey() == nil { - continue // not a state event - } - // TODO: This is a linear scan over all the current state events in this room. This will - // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) - // then do a binary search to find matching events, similar to what roomserver does. - for j := 0; j < len(stateEvents); j++ { - if stateEvents[j].EventID() == recentEv.EventID() { - // overwrite the element to remove with the last element then pop the last element. - // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering - // (we don't care about the order of stateEvents) - stateEvents[j] = stateEvents[len(stateEvents)-1] - stateEvents = stateEvents[:len(stateEvents)-1] - break // there shouldn't be multiple events with the same event ID - } - } - } - return stateEvents -} - -// getMembershipFromEvent returns the value of content.membership iff the event is a state event -// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. -func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { - if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { - membership, err := ev.Membership() - if err != nil { - return "" - } - return membership - } - return "" -} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e9fed758..96e9ff61 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -3,11 +3,14 @@ package shared import ( "context" "database/sql" + "encoding/json" + "fmt" "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -17,11 +20,14 @@ import ( // Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite // For now this contains the shared functions type Database struct { - DB *sql.DB - Invites tables.Invites - AccountData tables.AccountData - OutputEvents tables.Events - EDUCache *cache.EDUCache + DB *sql.DB + Invites tables.Invites + AccountData tables.AccountData + OutputEvents tables.Events + Topology tables.Topology + CurrentRoomState tables.CurrentRoomState + BackwardExtremities tables.BackwardsExtremities + EDUCache *cache.EDUCache } // Events lookups a list of event by their event ID. @@ -82,6 +88,26 @@ func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { d.EDUCache.SetTimeoutCallback(fn) } +func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { + return d.CurrentRoomState.SelectJoinedUsers(ctx) +} + +func (d *Database) GetStateEvent( + ctx context.Context, roomID, evType, stateKey string, +) (*gomatrixserverlib.HeaderedEvent, error) { + return d.CurrentRoomState.SelectStateEvent(ctx, roomID, evType, stateKey) +} + +func (d *Database) GetStateEventsForRoom( + ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, +) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) { + err = common.WithTransaction(d.DB, func(txn *sql.Tx) error { + stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) + return err + }) + return +} + func (d *Database) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { var maxID int64 var err error @@ -182,3 +208,863 @@ func (d *Database) StreamEventsToEvents(device *authtypes.Device, in []types.Str } return out } + +// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of +// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table +// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. +func (d *Database) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { + if err := d.BackwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + return err + } + + // Check if we have all of the event's previous events. If an event is + // missing, add it to the room's backward extremities. + prevEvents, err := d.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs()) + if err != nil { + return err + } + var found bool + for _, eID := range ev.PrevEventIDs() { + found = false + for _, prevEv := range prevEvents { + if eID == prevEv.EventID() { + found = true + } + } + + // If the event is missing, consider it a backward extremity. + if !found { + if err = d.BackwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { + return err + } + } + } + + return nil +} + +func (d *Database) WriteEvent( + ctx context.Context, + ev *gomatrixserverlib.HeaderedEvent, + addStateEvents []gomatrixserverlib.HeaderedEvent, + addStateEventIDs, removeStateEventIDs []string, + transactionID *api.TransactionID, excludeFromSync bool, +) (pduPosition types.StreamPosition, returnErr error) { + returnErr = common.WithTransaction(d.DB, func(txn *sql.Tx) error { + var err error + pos, err := d.OutputEvents.InsertEvent( + ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, + ) + if err != nil { + return err + } + pduPosition = pos + + if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil { + return err + } + + if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { + return err + } + + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { + // Nothing to do, the event may have just been a message event. + return nil + } + + return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) + }) + + return pduPosition, returnErr +} + +func (d *Database) updateRoomState( + ctx context.Context, txn *sql.Tx, + removedEventIDs []string, + addedEvents []gomatrixserverlib.HeaderedEvent, + pduPosition types.StreamPosition, +) error { + // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. + for _, eventID := range removedEventIDs { + if err := d.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil { + return err + } + } + + for _, event := range addedEvents { + if event.StateKey() == nil { + // ignore non state events + continue + } + var membership *string + if event.Type() == "m.room.member" { + value, err := event.Membership() + if err != nil { + return err + } + membership = &value + } + if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { + return err + } + } + + return nil +} + +func (d *Database) GetEventsInTopologicalRange( + ctx context.Context, + from, to *types.TopologyToken, + roomID string, limit int, + backwardOrdering bool, +) (events []types.StreamEvent, err error) { + // Determine the backward and forward limit, i.e. the upper and lower + // limits to the selection in the room's topology, from the direction. + var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition + if backwardOrdering { + // Backward ordering is antichronological (latest event to oldest + // one). + backwardLimit = to.Depth() + forwardLimit = from.Depth() + forwardMicroLimit = from.PDUPosition() + } else { + // Forward ordering is chronological (oldest event to latest one). + backwardLimit = from.Depth() + forwardLimit = to.Depth() + } + + // Select the event IDs from the defined range. + var eIDs []string + eIDs, err = d.Topology.SelectEventIDsInRange( + ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, + ) + if err != nil { + return + } + + // Retrieve the events' contents using their IDs. + events, err = d.OutputEvents.SelectEvents(ctx, nil, eIDs) + return +} + +func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) { + err = common.WithTransaction(d.DB, func(txn *sql.Tx) error { + pos, err := d.SyncPositionTx(ctx, txn) + if err != nil { + return err + } + tok = pos + return nil + }) + return +} + +func (d *Database) BackwardExtremitiesForRoom( + ctx context.Context, roomID string, +) (backwardExtremities []string, err error) { + return d.BackwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) +} + +func (d *Database) MaxTopologicalPosition( + ctx context.Context, roomID string, +) (depth types.StreamPosition, stream types.StreamPosition, err error) { + return d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID) +} + +func (d *Database) EventsAtTopologicalPosition( + ctx context.Context, roomID string, pos types.StreamPosition, +) ([]types.StreamEvent, error) { + eIDs, err := d.Topology.SelectEventIDsFromPosition(ctx, nil, roomID, pos) + if err != nil { + return nil, err + } + + return d.OutputEvents.SelectEvents(ctx, nil, eIDs) +} + +func (d *Database) EventPositionInTopology( + ctx context.Context, eventID string, +) (depth types.StreamPosition, stream types.StreamPosition, err error) { + return d.Topology.SelectPositionInTopology(ctx, nil, eventID) +} + +// TODO FIXME TEMPORARY PUBLIC +func (d *Database) SyncPositionTx( + ctx context.Context, txn *sql.Tx, +) (sp types.StreamingToken, err error) { + + maxEventID, err := d.OutputEvents.SelectMaxEventID(ctx, txn) + if err != nil { + return sp, err + } + maxAccountDataID, err := d.AccountData.SelectMaxAccountDataID(ctx, txn) + if err != nil { + return sp, err + } + if maxAccountDataID > maxEventID { + maxEventID = maxAccountDataID + } + maxInviteID, err := d.Invites.SelectMaxInviteID(ctx, txn) + if err != nil { + return sp, err + } + if maxInviteID > maxEventID { + maxEventID = maxInviteID + } + sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition())) + return +} + +// addPDUDeltaToResponse adds all PDU deltas to a sync response. +// IDs of all rooms the user joined are returned so EDU deltas can be added for them. +func (d *Database) addPDUDeltaToResponse( + ctx context.Context, + device authtypes.Device, + fromPos, toPos types.StreamPosition, + numRecentEventsPerRoom int, + wantFullState bool, + res *types.Response, +) (joinedRoomIDs []string, err error) { + txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return nil, err + } + var succeeded bool + defer func() { + txerr := common.EndTransaction(txn, &succeeded) + if err == nil && txerr != nil { + err = txerr + } + }() + + stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request + + // Work out which rooms to return in the response. This is done by getting not only the currently + // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. + // This works out what the 'state' key should be for each room as well as which membership block + // to put the room into. + var deltas []stateDelta + if !wantFullState { + deltas, joinedRoomIDs, err = d.getStateDeltas( + ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, + ) + } else { + deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( + ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, + ) + } + if err != nil { + return nil, err + } + + for _, delta := range deltas { + err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + if err != nil { + return nil, err + } + } + + // TODO: This should be done in getStateDeltas + if err = d.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + return nil, err + } + + succeeded = true + return joinedRoomIDs, nil +} + +// addTypingDeltaToResponse adds all typing notifications to a sync response +// since the specified position. +func (d *Database) addTypingDeltaToResponse( + since types.StreamingToken, + joinedRoomIDs []string, + res *types.Response, +) error { + var jr types.JoinResponse + var ok bool + var err error + for _, roomID := range joinedRoomIDs { + if typingUsers, updated := d.EDUCache.GetTypingUsersIfUpdatedAfter( + roomID, int64(since.EDUPosition()), + ); updated { + ev := gomatrixserverlib.ClientEvent{ + Type: gomatrixserverlib.MTyping, + } + ev.Content, err = json.Marshal(map[string]interface{}{ + "user_ids": typingUsers, + }) + if err != nil { + return err + } + + if jr, ok = res.Rooms.Join[roomID]; !ok { + jr = *types.NewJoinResponse() + } + jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + res.Rooms.Join[roomID] = jr + } + } + return nil +} + +// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if +// the positions of that type are not equal in fromPos and toPos. +// TODO FIXME TEMPORARY PUBLIC +func (d *Database) AddEDUDeltaToResponse( + fromPos, toPos types.StreamingToken, + joinedRoomIDs []string, + res *types.Response, +) (err error) { + + if fromPos.EDUPosition() != toPos.EDUPosition() { + err = d.addTypingDeltaToResponse( + fromPos, joinedRoomIDs, res, + ) + } + + return +} + +func (d *Database) IncrementalSync( + ctx context.Context, + device authtypes.Device, + fromPos, toPos types.StreamingToken, + numRecentEventsPerRoom int, + wantFullState bool, +) (*types.Response, error) { + nextBatchPos := fromPos.WithUpdates(toPos) + res := types.NewResponse(nextBatchPos) + + var joinedRoomIDs []string + var err error + if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { + joinedRoomIDs, err = d.addPDUDeltaToResponse( + ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res, + ) + } else { + joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership( + ctx, nil, device.UserID, gomatrixserverlib.Join, + ) + } + if err != nil { + return nil, err + } + + err = d.AddEDUDeltaToResponse( + fromPos, toPos, joinedRoomIDs, res, + ) + if err != nil { + return nil, err + } + + return res, nil +} + +// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed +// to it. It returns toPos and joinedRoomIDs for use of adding EDUs. +func (d *Database) getResponseWithPDUsForCompleteSync( + ctx context.Context, + userID string, + numRecentEventsPerRoom int, +) ( + res *types.Response, + toPos types.StreamingToken, + joinedRoomIDs []string, + err error, +) { + // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have + // a consistent view of the database throughout. This includes extracting the sync position. + // This does have the unfortunate side-effect that all the matrixy logic resides in this function, + // but it's better to not hide the fact that this is being done in a transaction. + txn, err := d.DB.BeginTx(ctx, &txReadOnlySnapshot) + if err != nil { + return + } + var succeeded bool + defer func() { + txerr := common.EndTransaction(txn, &succeeded) + if err == nil && txerr != nil { + err = txerr + } + }() + + // Get the current sync position which we will base the sync response on. + toPos, err = d.SyncPositionTx(ctx, txn) + if err != nil { + return + } + + res = types.NewResponse(toPos) + + // Extract room state and recent events for all rooms the user is joined to. + joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) + if err != nil { + return + } + + stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request + + // Build up a /sync response. Add joined rooms. + for _, roomID := range joinedRoomIDs { + var stateEvents []gomatrixserverlib.HeaderedEvent + stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, &stateFilter) + if err != nil { + return + } + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []types.StreamEvent + recentStreamEvents, err = d.OutputEvents.SelectRecentEvents( + ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), + numRecentEventsPerRoom, true, true, + ) + if err != nil { + return + } + + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var prevBatchStr string + if len(recentStreamEvents) > 0 { + var backwardTopologyPos, backwardStreamPos types.StreamPosition + backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) + if err != nil { + return + } + prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) + prevBatch.Decrement() + prevBatchStr = prevBatch.String() + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs + recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr := types.NewJoinResponse() + jr.Timeline.PrevBatch = prevBatchStr + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr + } + + if err = d.AddInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { + return + } + + succeeded = true + return res, toPos, joinedRoomIDs, err +} + +func (d *Database) CompleteSync( + ctx context.Context, userID string, numRecentEventsPerRoom int, +) (*types.Response, error) { + res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( + ctx, userID, numRecentEventsPerRoom, + ) + if err != nil { + return nil, err + } + + // Use a zero value SyncPosition for fromPos so all EDU states are added. + err = d.AddEDUDeltaToResponse( + types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res, + ) + if err != nil { + return nil, err + } + + return res, nil +} + +var txReadOnlySnapshot = sql.TxOptions{ + // Set the isolation level so that we see a snapshot of the database. + // In PostgreSQL repeatable read transactions will see a snapshot taken + // at the first query, and since the transaction is read-only it can't + // run into any serialisation errors. + // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, +} + +// TODO FIXME temporary public +func (d *Database) AddInvitesToResponse( + ctx context.Context, txn *sql.Tx, + userID string, + fromPos, toPos types.StreamPosition, + res *types.Response, +) error { + invites, err := d.Invites.SelectInviteEventsInRange( + ctx, txn, userID, fromPos, toPos, + ) + if err != nil { + return err + } + for roomID, inviteEvent := range invites { + ir := types.NewInviteResponse(inviteEvent) + res.Rooms.Invite[roomID] = *ir + } + return nil +} + +// Retrieve the backward topology position, i.e. the position of the +// oldest event in the room's topology. +func (d *Database) getBackwardTopologyPos( + ctx context.Context, txn *sql.Tx, + events []types.StreamEvent, +) (types.TopologyToken, error) { + zeroToken := types.NewTopologyToken(0, 0) + if len(events) == 0 { + return zeroToken, nil + } + pos, spos, err := d.Topology.SelectPositionInTopology(ctx, txn, events[0].EventID()) + if err != nil { + return zeroToken, err + } + tok := types.NewTopologyToken(pos, spos) + tok.Decrement() + return tok, nil +} + +// addRoomDeltaToResponse adds a room state delta to a sync response +func (d *Database) addRoomDeltaToResponse( + ctx context.Context, + device *authtypes.Device, + txn *sql.Tx, + fromPos, toPos types.StreamPosition, + delta stateDelta, + numRecentEventsPerRoom int, + res *types.Response, +) error { + endPos := toPos + if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + endPos = delta.membershipPos + } + recentStreamEvents, err := d.OutputEvents.SelectRecentEvents( + ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), + numRecentEventsPerRoom, true, true, + ) + if err != nil { + return err + } + recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back + prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents) + if err != nil { + return err + } + + switch delta.membership { + case gomatrixserverlib.Join: + jr := types.NewJoinResponse() + + jr.Timeline.PrevBatch = prevBatch.String() + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case gomatrixserverlib.Leave: + fallthrough // transitions to leave are the same as ban + case gomatrixserverlib.Ban: + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + lr.Timeline.PrevBatch = prevBatch.String() + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr + } + + return nil +} + +// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. +// Returns a map of room ID to list of events. +func (d *Database) fetchStateEvents( + ctx context.Context, txn *sql.Tx, + roomIDToEventIDSet map[string]map[string]bool, + eventIDToEvent map[string]types.StreamEvent, +) (map[string][]types.StreamEvent, error) { + stateBetween := make(map[string][]types.StreamEvent) + missingEvents := make(map[string][]string) + for roomID, ids := range roomIDToEventIDSet { + events := stateBetween[roomID] + for id, need := range ids { + if !need { + continue // deleted state + } + e, ok := eventIDToEvent[id] + if ok { + events = append(events, e) + } else { + m := missingEvents[roomID] + m = append(m, id) + missingEvents[roomID] = m + } + } + stateBetween[roomID] = events + } + + if len(missingEvents) > 0 { + // This happens when add_state_ids has an event ID which is not in the provided range. + // We need to explicitly fetch them. + allMissingEventIDs := []string{} + for _, missingEvIDs := range missingEvents { + allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) + } + evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs) + if err != nil { + return nil, err + } + // we know we got them all otherwise an error would've been returned, so just loop the events + for _, ev := range evs { + roomID := ev.RoomID() + stateBetween[roomID] = append(stateBetween[roomID], ev) + } + } + return stateBetween, nil +} + +func (d *Database) fetchMissingStateEvents( + ctx context.Context, txn *sql.Tx, eventIDs []string, +) ([]types.StreamEvent, error) { + // Fetch from the events table first so we pick up the stream ID for the + // event. + events, err := d.OutputEvents.SelectEvents(ctx, txn, eventIDs) + if err != nil { + return nil, err + } + + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range eventIDs { + if !have[eventID] { + missing = append(missing, eventID) + } + } + if len(missing) == 0 { + return events, nil + } + + // If they are missing from the events table then they should be state + // events that we received from outside the main event stream. + // These should be in the room state table. + stateEvents, err := d.CurrentRoomState.SelectEventsWithEventIDs(ctx, txn, missing) + + if err != nil { + return nil, err + } + if len(stateEvents) != len(missing) { + return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) + } + events = append(events, stateEvents...) + return events, nil +} + +// getStateDeltas returns the state deltas between fromPos and toPos, +// exclusive of oldPos, inclusive of newPos, for the rooms in which +// the user has new membership events. +// A list of joined room IDs is also returned in case the caller needs it. +func (d *Database) getStateDeltas( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos types.StreamPosition, userID string, + stateFilter *gomatrixserverlib.StateFilter, +) ([]stateDelta, []string, error) { + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. + // - Get all CURRENTLY joined rooms, and add them to 'joined' block. + var deltas []stateDelta + + // get all the state events ever between these two positions + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + if err != nil { + return nil, nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership == gomatrixserverlib.Join { + // send full room state down instead of a delta + var s []types.StreamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter) + if err != nil { + return nil, nil, err + } + state[roomID] = s + continue // we'll add this room in when we do joined rooms + } + + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.StreamPosition, + stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + break + } + } + } + + // Add in currently joined rooms + joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) + if err != nil { + return nil, nil, err + } + for _, joinedRoomID := range joinedRoomIDs { + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Join, + stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), + roomID: joinedRoomID, + }) + } + + return deltas, joinedRoomIDs, nil +} + +// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// requests with full_state=true. +// Fetches full state for all joined rooms and uses selectStateInRange to get +// updates for other rooms. +func (d *Database) getStateDeltasForFullStateSync( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos types.StreamPosition, userID string, + stateFilter *gomatrixserverlib.StateFilter, +) ([]stateDelta, []string, error) { + joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) + if err != nil { + return nil, nil, err + } + + // Use a reasonable initial capacity + deltas := make([]stateDelta, 0, len(joinedRoomIDs)) + + // Add full states for all joined rooms + for _, joinedRoomID := range joinedRoomIDs { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter) + if stateErr != nil { + return nil, nil, stateErr + } + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Join, + stateEvents: d.StreamEventsToEvents(device, s), + roomID: joinedRoomID, + }) + } + + // Get all the state events ever between these two positions + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + if err != nil { + return nil, nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.StreamPosition, + stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + } + + break + } + } + } + + return deltas, joinedRoomIDs, nil +} + +func (d *Database) currentStateStreamEventsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, + stateFilter *gomatrixserverlib.StateFilter, +) ([]types.StreamEvent, error) { + allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) + if err != nil { + return nil, err + } + s := make([]types.StreamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0} + } + return s, nil +} + +// There may be some overlap where events in stateEvents are already in recentEvents, so filter +// them out so we don't include them twice in the /sync response. They should be in recentEvents +// only, so clients get to the correct state once they have rolled forward. +func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent { + for _, recentEv := range recentEvents { + if recentEv.StateKey() == nil { + continue // not a state event + } + // TODO: This is a linear scan over all the current state events in this room. This will + // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) + // then do a binary search to find matching events, similar to what roomserver does. + for j := 0; j < len(stateEvents); j++ { + if stateEvents[j].EventID() == recentEv.EventID() { + // overwrite the element to remove with the last element then pop the last element. + // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering + // (we don't care about the order of stateEvents) + stateEvents[j] = stateEvents[len(stateEvents)-1] + stateEvents = stateEvents[:len(stateEvents)-1] + break // there shouldn't be multiple events with the same event ID + } + } + } + return stateEvents +} + +// getMembershipFromEvent returns the value of content.membership iff the event is a state event +// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. +func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + membership, err := ev.Membership() + if err != nil { + return "" + } + return membership + } + return "" +} + +type stateDelta struct { + roomID string + stateEvents []gomatrixserverlib.HeaderedEvent + membership string + // The PDU stream position of the latest membership event for this user, if applicable. + // Can be 0 if there is no membership event in this delta. + membershipPos types.StreamPosition +} diff --git a/syncapi/storage/sqlite3/backwards_extremities_table.go b/syncapi/storage/sqlite3/backwards_extremities_table.go new file mode 100644 index 00000000..a172f6bf --- /dev/null +++ b/syncapi/storage/sqlite3/backwards_extremities_table.go @@ -0,0 +1,106 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" +) + +const backwardExtremitiesSchema = ` +-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The event ID for the last known event. This is the backwards extremity. + event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, + PRIMARY KEY(room_id, event_id, prev_event_id) +); +` + +const insertBackwardExtremitySQL = "" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" + +const selectBackwardExtremitiesForRoomSQL = "" + + "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" + +const deleteBackwardExtremitySQL = "" + + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" + +type backwardExtremitiesStatements struct { + insertBackwardExtremityStmt *sql.Stmt + selectBackwardExtremitiesForRoomStmt *sql.Stmt + deleteBackwardExtremityStmt *sql.Stmt +} + +func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) { + s := &backwardExtremitiesStatements{} + _, err := db.Exec(backwardExtremitiesSchema) + if err != nil { + return nil, err + } + if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil { + return nil, err + } + if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { + return nil, err + } + if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { + return nil, err + } + return s, nil +} + +func (s *backwardExtremitiesStatements) InsertsBackwardExtremity( + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, +) (err error) { + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) + return +} + +func (s *backwardExtremitiesStatements) SelectBackwardExtremitiesForRoom( + ctx context.Context, roomID string, +) (eventIDs []string, err error) { + rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) + if err != nil { + return + } + defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") + + for rows.Next() { + var eID string + if err = rows.Scan(&eID); err != nil { + return + } + + eventIDs = append(eventIDs, eID) + } + + return eventIDs, rows.Err() +} + +func (s *backwardExtremitiesStatements) DeleteBackwardExtremity( + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, +) (err error) { + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) + return +} diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 9fafdbed..b540fbc6 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -91,35 +92,37 @@ type currentRoomStateStatements struct { selectStateEventStmt *sql.Stmt } -func (s *currentRoomStateStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { - s.streamIDStatements = streamID - _, err = db.Exec(currentRoomStateSchema) +func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (tables.CurrentRoomState, error) { + s := ¤tRoomStateStatements{ + streamIDStatements: streamID, + } + _, err := db.Exec(currentRoomStateSchema) if err != nil { - return + return nil, err } if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil { - return + return nil, err } if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { - return + return nil, err } if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil { - return + return nil, err } if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil { - return + return nil, err } if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil { - return + return nil, err } if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { - return + return nil, err } - return + return s, nil } // JoinedMemberLists returns a map of room ID to a list of joined user IDs. -func (s *currentRoomStateStatements) selectJoinedUsers( +func (s *currentRoomStateStatements) SelectJoinedUsers( ctx context.Context, ) (map[string][]string, error) { rows, err := s.selectJoinedUsersStmt.QueryContext(ctx) @@ -143,7 +146,7 @@ func (s *currentRoomStateStatements) selectJoinedUsers( } // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. -func (s *currentRoomStateStatements) selectRoomIDsWithMembership( +func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( ctx context.Context, txn *sql.Tx, userID string, @@ -168,7 +171,7 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership( } // CurrentState returns all the current state events for the given room. -func (s *currentRoomStateStatements) selectCurrentState( +func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, ) ([]gomatrixserverlib.HeaderedEvent, error) { @@ -189,7 +192,7 @@ func (s *currentRoomStateStatements) selectCurrentState( return rowsToEvents(rows) } -func (s *currentRoomStateStatements) deleteRoomStateByEventID( +func (s *currentRoomStateStatements) DeleteRoomStateByEventID( ctx context.Context, txn *sql.Tx, eventID string, ) error { stmt := common.TxStmt(txn, s.deleteRoomStateByEventIDStmt) @@ -197,7 +200,7 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID( return err } -func (s *currentRoomStateStatements) upsertRoomState( +func (s *currentRoomStateStatements) UpsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition, ) error { @@ -231,7 +234,7 @@ func (s *currentRoomStateStatements) upsertRoomState( return err } -func (s *currentRoomStateStatements) selectEventsWithEventIDs( +func (s *currentRoomStateStatements) SelectEventsWithEventIDs( ctx context.Context, txn *sql.Tx, eventIDs []string, ) ([]types.StreamEvent, error) { iEventIDs := make([]interface{}, len(eventIDs)) @@ -264,7 +267,7 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.HeaderedEvent, error) { return result, nil } -func (s *currentRoomStateStatements) selectStateEvent( +func (s *currentRoomStateStatements) SelectStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { stmt := s.selectStateEventStmt diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index 0d313d7c..4469f5b7 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -77,35 +78,36 @@ type outputRoomEventsTopologyStatements struct { selectEventIDsFromPositionStmt *sql.Stmt } -func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(outputRoomEventsTopologySchema) +func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { + s := &outputRoomEventsTopologyStatements{} + _, err := db.Exec(outputRoomEventsTopologySchema) if err != nil { - return + return nil, err } if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { - return + return nil, err } if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { - return + return nil, err } if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { - return + return nil, err } if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { - return + return nil, err } if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { - return + return nil, err } if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil { - return + return nil, err } - return + return s, nil } // insertEventInTopology inserts the given event in the room's topology, based // on the event's depth. -func (s *outputRoomEventsTopologyStatements) insertEventInTopology( +func (s *outputRoomEventsTopologyStatements) InsertEventInTopology( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition, ) (err error) { stmt := common.TxStmt(txn, s.insertEventInTopologyStmt) @@ -118,7 +120,7 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology( // selectEventIDsInRange selects the IDs of events which positions are within a // given range in a given room's topological order. // Returns an empty slice if no events match the given range. -func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( +func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool, @@ -155,7 +157,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( // selectPositionInTopology returns the position of a given event in the // topology of the room it belongs to. -func (s *outputRoomEventsTopologyStatements) selectPositionInTopology( +func (s *outputRoomEventsTopologyStatements) SelectPositionInTopology( ctx context.Context, txn *sql.Tx, eventID string, ) (pos types.StreamPosition, spos types.StreamPosition, err error) { stmt := common.TxStmt(txn, s.selectPositionInTopologyStmt) @@ -163,7 +165,7 @@ func (s *outputRoomEventsTopologyStatements) selectPositionInTopology( return } -func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( +func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( ctx context.Context, txn *sql.Tx, roomID string, ) (pos types.StreamPosition, spos types.StreamPosition, err error) { stmt := common.TxStmt(txn, s.selectMaxPositionInTopologyStmt) @@ -173,7 +175,7 @@ func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( // selectEventIDsFromPosition returns the IDs of all events that have a given // position in the topology of a given room. -func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition( +func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition( ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition, ) (eventIDs []string, err error) { // Query the event IDs. diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 0da05eab..e23aeca4 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -16,18 +16,11 @@ package sqlite3 import ( - "context" "database/sql" - "encoding/json" "errors" - "fmt" "net/url" - "github.com/sirupsen/logrus" - - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/roomserver/api" // Import the sqlite3 package _ "github.com/mattn/go-sqlite3" @@ -35,35 +28,20 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/storage/shared" - "github.com/matrix-org/dendrite/syncapi/storage/tables" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" ) -type stateDelta struct { - roomID string - stateEvents []gomatrixserverlib.HeaderedEvent - membership string - // The PDU stream position of the latest membership event for this user, if applicable. - // Can be 0 if there is no membership event in this delta. - membershipPos types.StreamPosition -} - // SyncServerDatasource represents a sync server datasource which manages // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { shared.Database db *sql.DB common.PartitionOffsetStatements - streamID streamIDStatements - roomstate currentRoomStateStatements - topology outputRoomEventsTopologyStatements - backwardExtremities tables.BackwardsExtremities + streamID streamIDStatements } -// NewSyncServerDatasource creates a new sync server database +// NewDatabase creates a new sync server database // nolint: gocyclo -func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, error) { +func NewDatabase(dataSourceName string) (*SyncServerDatasource, error) { var d SyncServerDatasource uri, err := url.Parse(dataSourceName) if err != nil { @@ -101,982 +79,31 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } - if err = d.roomstate.prepare(d.db, &d.streamID); err != nil { + roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID) + if err != nil { return err } invites, err := NewSqliteInvitesTable(d.db, &d.streamID) if err != nil { return err } - if err = d.topology.prepare(d.db); err != nil { + topology, err := NewSqliteTopologyTable(d.db) + if err != nil { return err } - d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.SqliteBackwardsExtremitiesStatements{}) + bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db) if err != nil { return err } d.Database = shared.Database{ - DB: d.db, - Invites: invites, - AccountData: accountData, - OutputEvents: events, - EDUCache: cache.New(), + DB: d.db, + Invites: invites, + AccountData: accountData, + OutputEvents: events, + BackwardExtremities: bwExtrem, + CurrentRoomState: roomState, + Topology: topology, + EDUCache: cache.New(), } return nil } - -// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. -func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) { - return d.roomstate.selectJoinedUsers(ctx) -} - -// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of -// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table -// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. -func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { - if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { - return err - } - - // Check if we have all of the event's previous events. If an event is - // missing, add it to the room's backward extremities. - prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs()) - if err != nil { - return err - } - var found bool - for _, eID := range ev.PrevEventIDs() { - found = false - for _, prevEv := range prevEvents { - if eID == prevEv.EventID() { - found = true - } - } - - // If the event is missing, consider it a backward extremity. - if !found { - if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { - return err - } - } - } - - return nil -} - -// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races -// when generating the sync stream position for this event. Returns the sync stream position for the inserted event. -// Returns an error if there was a problem inserting this event. -func (d *SyncServerDatasource) WriteEvent( - ctx context.Context, - ev *gomatrixserverlib.HeaderedEvent, - addStateEvents []gomatrixserverlib.HeaderedEvent, - addStateEventIDs, removeStateEventIDs []string, - transactionID *api.TransactionID, excludeFromSync bool, -) (pduPosition types.StreamPosition, returnErr error) { - returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { - var err error - pos, err := d.Database.OutputEvents.InsertEvent( - ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, - ) - if err != nil { - return err - } - pduPosition = pos - - if err = d.topology.insertEventInTopology(ctx, txn, ev, pos); err != nil { - return err - } - - if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil { - return err - } - - if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { - // Nothing to do, the event may have just been a message event. - return nil - } - - return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition) - }) - - return pduPosition, returnErr -} - -func (d *SyncServerDatasource) updateRoomState( - ctx context.Context, txn *sql.Tx, - removedEventIDs []string, - addedEvents []gomatrixserverlib.HeaderedEvent, - pduPosition types.StreamPosition, -) error { - // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. - for _, eventID := range removedEventIDs { - if err := d.roomstate.deleteRoomStateByEventID(ctx, txn, eventID); err != nil { - return err - } - } - - for _, event := range addedEvents { - if event.StateKey() == nil { - // ignore non state events - continue - } - var membership *string - if event.Type() == "m.room.member" { - value, err := event.Membership() - if err != nil { - return err - } - membership = &value - } - if err := d.roomstate.upsertRoomState(ctx, txn, event, membership, pduPosition); err != nil { - return err - } - } - - return nil -} - -// SyncPosition returns the latest positions for syncing. -func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - pos, err := d.syncPositionTx(ctx, txn) - if err != nil { - return err - } - tok = *pos - return nil - }) - return -} - -// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key -// If no event could be found, returns nil -// If there was an issue during the retrieval, returns an error -func (d *SyncServerDatasource) GetStateEvent( - ctx context.Context, roomID, evType, stateKey string, -) (*gomatrixserverlib.HeaderedEvent, error) { - return d.roomstate.selectStateEvent(ctx, roomID, evType, stateKey) -} - -// GetStateEventsForRoom fetches the state events for a given room. -// Returns an empty slice if no state events could be found for this room. -// Returns an error if there was an issue with the retrieval. -func (d *SyncServerDatasource) GetStateEventsForRoom( - ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, -) (stateEvents []gomatrixserverlib.HeaderedEvent, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart) - return err - }) - return -} - -// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the -// given extremities and limit. -func (d *SyncServerDatasource) GetEventsInTopologicalRange( - ctx context.Context, - from, to *types.TopologyToken, - roomID string, limit int, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - // TODO: ARGH CONFUSING - // Determine the backward and forward limit, i.e. the upper and lower - // limits to the selection in the room's topology, from the direction. - var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition - if backwardOrdering { - // Backward ordering is antichronological (latest event to oldest - // one). - backwardLimit = to.Depth() - forwardLimit = from.Depth() - forwardMicroLimit = from.PDUPosition() - } else { - // Forward ordering is chronological (oldest event to latest one). - backwardLimit = from.Depth() - forwardLimit = to.Depth() - } - - // Select the event IDs from the defined range. - var eIDs []string - eIDs, err = d.topology.selectEventIDsInRange( - ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering, - ) - if err != nil { - return - } - - // Retrieve the events' contents using their IDs. - events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) - return -} - -// BackwardExtremitiesForRoom returns the event IDs of all of the backward -// extremities we know of for a given room. -func (d *SyncServerDatasource) BackwardExtremitiesForRoom( - ctx context.Context, roomID string, -) (backwardExtremities []string, err error) { - return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) -} - -// MaxTopologicalPosition returns the highest topological position for a given -// room. -func (d *SyncServerDatasource) MaxTopologicalPosition( - ctx context.Context, roomID string, -) (types.StreamPosition, types.StreamPosition, error) { - return d.topology.selectMaxPositionInTopology(ctx, nil, roomID) -} - -// EventsAtTopologicalPosition returns all of the events matching a given -// position in the topology of a given room. -func (d *SyncServerDatasource) EventsAtTopologicalPosition( - ctx context.Context, roomID string, pos types.StreamPosition, -) ([]types.StreamEvent, error) { - eIDs, err := d.topology.selectEventIDsFromPosition(ctx, nil, roomID, pos) - if err != nil { - return nil, err - } - - return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) -} - -func (d *SyncServerDatasource) EventPositionInTopology( - ctx context.Context, eventID string, -) (depth types.StreamPosition, stream types.StreamPosition, err error) { - return d.topology.selectPositionInTopology(ctx, nil, eventID) -} - -// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. -func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos types.StreamPosition, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - pos, err = d.syncStreamPositionTx(ctx, txn) - return err - }) - return -} - -func (d *SyncServerDatasource) syncStreamPositionTx( - ctx context.Context, txn *sql.Tx, -) (types.StreamPosition, error) { - maxID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) - if err != nil { - return 0, err - } - maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) - if err != nil { - return 0, err - } - if maxAccountDataID > maxID { - maxID = maxAccountDataID - } - maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) - if err != nil { - return 0, err - } - if maxInviteID > maxID { - maxID = maxInviteID - } - return types.StreamPosition(maxID), nil -} - -func (d *SyncServerDatasource) syncPositionTx( - ctx context.Context, txn *sql.Tx, -) (*types.StreamingToken, error) { - - maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) - if err != nil { - return nil, err - } - maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) - if err != nil { - return nil, err - } - if maxAccountDataID > maxEventID { - maxEventID = maxAccountDataID - } - maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) - if err != nil { - return nil, err - } - if maxInviteID > maxEventID { - maxEventID = maxInviteID - } - sp := types.NewStreamToken( - types.StreamPosition(maxEventID), - types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition()), - ) - return &sp, nil -} - -// addPDUDeltaToResponse adds all PDU deltas to a sync response. -// IDs of all rooms the user joined are returned so EDU deltas can be added for them. -func (d *SyncServerDatasource) addPDUDeltaToResponse( - ctx context.Context, - device authtypes.Device, - fromPos, toPos types.StreamPosition, - numRecentEventsPerRoom int, - wantFullState bool, - res *types.Response, -) (joinedRoomIDs []string, err error) { - txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) - if err != nil { - return nil, err - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - - stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request - - // Work out which rooms to return in the response. This is done by getting not only the currently - // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. - // This works out what the 'state' key should be for each room as well as which membership block - // to put the room into. - var deltas []stateDelta - if !wantFullState { - deltas, joinedRoomIDs, err = d.getStateDeltas( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart, - ) - } else { - deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilterPart, - ) - } - if err != nil { - return nil, err - } - - for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) - if err != nil { - return nil, err - } - } - - // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { - return nil, err - } - - succeeded = true - return joinedRoomIDs, nil -} - -// addTypingDeltaToResponse adds all typing notifications to a sync response -// since the specified position. -func (d *SyncServerDatasource) addTypingDeltaToResponse( - since types.StreamingToken, - joinedRoomIDs []string, - res *types.Response, -) error { - var jr types.JoinResponse - var ok bool - var err error - for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter( - roomID, int64(since.EDUPosition()), - ); updated { - ev := gomatrixserverlib.ClientEvent{ - Type: gomatrixserverlib.MTyping, - } - ev.Content, err = json.Marshal(map[string]interface{}{ - "user_ids": typingUsers, - }) - if err != nil { - return err - } - - if jr, ok = res.Rooms.Join[roomID]; !ok { - jr = *types.NewJoinResponse() - } - jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) - res.Rooms.Join[roomID] = jr - } - } - return nil -} - -// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if -// the positions of that type are not equal in fromPos and toPos. -func (d *SyncServerDatasource) addEDUDeltaToResponse( - fromPos, toPos types.StreamingToken, - joinedRoomIDs []string, - res *types.Response, -) (err error) { - - if fromPos.EDUPosition() != toPos.EDUPosition() { - err = d.addTypingDeltaToResponse( - fromPos, joinedRoomIDs, res, - ) - } - - return -} - -// IncrementalSync returns all the data needed in order to create an incremental -// sync response for the given user. Events returned will include any client -// transaction IDs associated with the given device. These transaction IDs come -// from when the device sent the event via an API that included a transaction -// ID. -func (d *SyncServerDatasource) IncrementalSync( - ctx context.Context, - device authtypes.Device, - fromPos, toPos types.StreamingToken, - numRecentEventsPerRoom int, - wantFullState bool, -) (*types.Response, error) { - nextBatchPos := fromPos.WithUpdates(toPos) - res := types.NewResponse(nextBatchPos) - - var joinedRoomIDs []string - var err error - fmt.Println("from", fromPos.PDUPosition(), "to", toPos.PDUPosition()) - if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { - joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res, - ) - } else { - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( - ctx, nil, device.UserID, gomatrixserverlib.Join, - ) - } - if err != nil { - return nil, err - } - - err = d.addEDUDeltaToResponse( - fromPos, toPos, joinedRoomIDs, res, - ) - if err != nil { - return nil, err - } - - return res, nil -} - -// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed -// to it. It returns toPos and joinedRoomIDs for use of adding EDUs. -func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( - ctx context.Context, - userID string, - numRecentEventsPerRoom int, -) ( - res *types.Response, - toPos *types.StreamingToken, - joinedRoomIDs []string, - err error, -) { - // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have - // a consistent view of the database throughout. This includes extracting the sync position. - // This does have the unfortunate side-effect that all the matrixy logic resides in this function, - // but it's better to not hide the fact that this is being done in a transaction. - txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) - if err != nil { - return - } - var succeeded bool - defer func() { - txerr := common.EndTransaction(txn, &succeeded) - if err == nil && txerr != nil { - err = txerr - } - }() - - // Get the current sync position which we will base the sync response on. - toPos, err = d.syncPositionTx(ctx, txn) - if err != nil { - return - } - - res = types.NewResponse(*toPos) - - // Extract room state and recent events for all rooms the user is joined to. - joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return - } - - stateFilterPart := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request - - // Build up a /sync response. Add joined rooms. - for _, roomID := range joinedRoomIDs { - var stateEvents []gomatrixserverlib.HeaderedEvent - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &stateFilterPart) - if err != nil { - return - } - //fmt.Println("State events:", stateEvents) - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), - numRecentEventsPerRoom, true, true, - ) - if err != nil { - return - } - //fmt.Println("Recent stream events:", recentStreamEvents) - - // Retrieve the backward topology position, i.e. the position of the - // oldest event in the room's topology. - var prevBatchStr string - if len(recentStreamEvents) > 0 { - var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) - if err != nil { - return - } - prevBatch := types.NewTopologyToken(backwardTopologyPos, backwardStreamPos) - prevBatch.Decrement() - prevBatchStr = prevBatch.String() - } - - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs - recentEvents := d.StreamEventsToEvents(nil, recentStreamEvents) - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = prevBatchStr - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - - if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { - return - } - - succeeded = true - return res, toPos, joinedRoomIDs, err -} - -// CompleteSync returns a complete /sync API response for the given user. -func (d *SyncServerDatasource) CompleteSync( - ctx context.Context, userID string, numRecentEventsPerRoom int, -) (*types.Response, error) { - res, toPos, joinedRoomIDs, err := d.getResponseWithPDUsForCompleteSync( - ctx, userID, numRecentEventsPerRoom, - ) - if err != nil { - return nil, err - } - - // Use a zero value SyncPosition for fromPos so all EDU states are added. - err = d.addEDUDeltaToResponse( - types.NewStreamToken(0, 0), *toPos, joinedRoomIDs, res, - ) - if err != nil { - return nil, err - } - - return res, nil -} - -var txReadOnlySnapshot = sql.TxOptions{ - // Set the isolation level so that we see a snapshot of the database. - // In PostgreSQL repeatable read transactions will see a snapshot taken - // at the first query, and since the transaction is read-only it can't - // run into any serialisation errors. - // https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ - Isolation: sql.LevelRepeatableRead, - ReadOnly: true, -} - -func (d *SyncServerDatasource) addInvitesToResponse( - ctx context.Context, txn *sql.Tx, - userID string, - fromPos, toPos types.StreamPosition, - res *types.Response, -) error { - invites, err := d.Database.Invites.SelectInviteEventsInRange( - ctx, txn, userID, fromPos, toPos, - ) - if err != nil { - return err - } - for roomID, inviteEvent := range invites { - ir := types.NewInviteResponse(inviteEvent) - res.Rooms.Invite[roomID] = *ir - } - return nil -} - -// Retrieve the backward topology position, i.e. the position of the -// oldest event in the room's topology. -func (d *SyncServerDatasource) getBackwardTopologyPos( - ctx context.Context, txn *sql.Tx, - events []types.StreamEvent, -) (pos, spos types.StreamPosition) { - if len(events) > 0 { - pos, spos, _ = d.topology.selectPositionInTopology(ctx, txn, events[0].EventID()) - } - // go to the previous position so we don't pull out the same event twice - // FIXME: This could be done more nicely by being explicit with inclusive/exclusive rules - if pos-1 <= 0 { - pos = types.StreamPosition(1) - } else { - pos = pos - 1 - spos += 1000 // this has to be bigger than the number of events we backfill per request - } - return -} - -// addRoomDeltaToResponse adds a room state delta to a sync response -func (d *SyncServerDatasource) addRoomDeltaToResponse( - ctx context.Context, - device *authtypes.Device, - txn *sql.Tx, - fromPos, toPos types.StreamPosition, - delta stateDelta, - numRecentEventsPerRoom int, - res *types.Response, -) error { - endPos := toPos - if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { - // make sure we don't leak recent events after the leave event. - // TODO: History visibility makes this somewhat complex to handle correctly. For example: - // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). - // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave - // in a single /sync request - // This is all "okay" assuming history_visibility == "shared" which it is by default. - endPos = delta.membershipPos - } - recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents( - ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), - numRecentEventsPerRoom, true, true, - ) - if err != nil { - return err - } - recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) - delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) - backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents) - prevBatch := types.NewTopologyToken( - backwardTopologyPos, backwardStreamPos, - ) - - switch delta.membership { - case gomatrixserverlib.Join: - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = prevBatch.String() - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.roomID] = *jr - case gomatrixserverlib.Leave: - fallthrough // transitions to leave are the same as ban - case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. - lr := types.NewLeaveResponse() - lr.Timeline.PrevBatch = prevBatch.String() - lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.roomID] = *lr - } - - return nil -} - -// fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. -// Returns a map of room ID to list of events. -func (d *SyncServerDatasource) fetchStateEvents( - ctx context.Context, txn *sql.Tx, - roomIDToEventIDSet map[string]map[string]bool, - eventIDToEvent map[string]types.StreamEvent, -) (map[string][]types.StreamEvent, error) { - stateBetween := make(map[string][]types.StreamEvent) - missingEvents := make(map[string][]string) - for roomID, ids := range roomIDToEventIDSet { - events := stateBetween[roomID] - for id, need := range ids { - if !need { - continue // deleted state - } - e, ok := eventIDToEvent[id] - if ok { - events = append(events, e) - } else { - m := missingEvents[roomID] - m = append(m, id) - missingEvents[roomID] = m - } - } - stateBetween[roomID] = events - } - - if len(missingEvents) > 0 { - // This happens when add_state_ids has an event ID which is not in the provided range. - // We need to explicitly fetch them. - allMissingEventIDs := []string{} - for _, missingEvIDs := range missingEvents { - allMissingEventIDs = append(allMissingEventIDs, missingEvIDs...) - } - evs, err := d.fetchMissingStateEvents(ctx, txn, allMissingEventIDs) - if err != nil { - return nil, err - } - // we know we got them all otherwise an error would've been returned, so just loop the events - for _, ev := range evs { - roomID := ev.RoomID() - stateBetween[roomID] = append(stateBetween[roomID], ev) - } - } - return stateBetween, nil -} - -func (d *SyncServerDatasource) fetchMissingStateEvents( - ctx context.Context, txn *sql.Tx, eventIDs []string, -) ([]types.StreamEvent, error) { - // Fetch from the events table first so we pick up the stream ID for the - // event. - events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs) - if err != nil { - return nil, err - } - - have := map[string]bool{} - for _, event := range events { - have[event.EventID()] = true - } - var missing []string - for _, eventID := range eventIDs { - if !have[eventID] { - missing = append(missing, eventID) - } - } - if len(missing) == 0 { - return events, nil - } - - // If they are missing from the events table then they should be state - // events that we received from outside the main event stream. - // These should be in the room state table. - stateEvents, err := d.roomstate.selectEventsWithEventIDs(ctx, txn, missing) - - if err != nil { - return nil, err - } - if len(stateEvents) != len(missing) { - return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(stateEvents), len(missing)) - } - events = append(events, stateEvents...) - return events, nil -} - -// getStateDeltas returns the state deltas between fromPos and toPos, -// exclusive of oldPos, inclusive of newPos, for the rooms in which -// the user has new membership events. -// A list of joined room IDs is also returned in case the caller needs it. -func (d *SyncServerDatasource) getStateDeltas( - ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, - stateFilterPart *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { - // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 - // - Get membership list changes for this user in this sync response - // - For each room which has membership list changes: - // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). - // If it is, then we need to send the full room state down (and 'limited' is always true). - // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. - // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. - // - Get all CURRENTLY joined rooms, and add them to 'joined' block. - var deltas []stateDelta - - // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) - if err != nil { - return nil, nil, err - } - state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) - if err != nil { - return nil, nil, err - } - - for roomID, stateStreamEvents := range state { - for _, ev := range stateStreamEvents { - // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. - // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, - // dupe join events will result in the entire room state coming down to the client again. This is added in - // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to - // the timeline. - if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" { - if membership == gomatrixserverlib.Join { - // send full room state down instead of a delta - var s []types.StreamEvent - s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilterPart) - if err != nil { - return nil, nil, err - } - state[roomID] = s - continue // we'll add this room in when we do joined rooms - } - - deltas = append(deltas, stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, - }) - break - } - } - } - - // Add in currently joined rooms - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return nil, nil, err - } - for _, joinedRoomID := range joinedRoomIDs { - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, state[joinedRoomID]), - roomID: joinedRoomID, - }) - } - - return deltas, joinedRoomIDs, nil -} - -// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync -// requests with full_state=true. -// Fetches full state for all joined rooms and uses selectStateInRange to get -// updates for other rooms. -func (d *SyncServerDatasource) getStateDeltasForFullStateSync( - ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, - stateFilterPart *gomatrixserverlib.StateFilter, -) ([]stateDelta, []string, error) { - joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) - if err != nil { - return nil, nil, err - } - - // Use a reasonable initial capacity - deltas := make([]stateDelta, 0, len(joinedRoomIDs)) - - // Add full states for all joined rooms - for _, joinedRoomID := range joinedRoomIDs { - s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilterPart) - if stateErr != nil { - return nil, nil, stateErr - } - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Join, - stateEvents: d.StreamEventsToEvents(device, s), - roomID: joinedRoomID, - }) - } - - // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) - if err != nil { - return nil, nil, err - } - state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) - if err != nil { - return nil, nil, err - } - - for roomID, stateStreamEvents := range state { - for _, ev := range stateStreamEvents { - if membership := getMembershipFromEvent(&ev.HeaderedEvent, userID); membership != "" { - if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above. - deltas = append(deltas, stateDelta{ - membership: membership, - membershipPos: ev.StreamPosition, - stateEvents: d.StreamEventsToEvents(device, stateStreamEvents), - roomID: roomID, - }) - } - - break - } - } - } - - return deltas, joinedRoomIDs, nil -} - -func (d *SyncServerDatasource) currentStateStreamEventsForRoom( - ctx context.Context, txn *sql.Tx, roomID string, - stateFilterPart *gomatrixserverlib.StateFilter, -) ([]types.StreamEvent, error) { - allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilterPart) - if err != nil { - return nil, err - } - s := make([]types.StreamEvent, len(allState)) - for i := 0; i < len(s); i++ { - s[i] = types.StreamEvent{HeaderedEvent: allState[i], StreamPosition: 0} - } - return s, nil -} - -// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and -// matches the streamevent.transactionID device then the transaction ID gets -// added to the unsigned section of the output event. -func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent { - out := make([]gomatrixserverlib.HeaderedEvent, len(in)) - for i := 0; i < len(in); i++ { - out[i] = in[i].HeaderedEvent - if device != nil && in[i].TransactionID != nil { - if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID { - err := out[i].SetUnsignedField( - "transaction_id", in[i].TransactionID.TransactionID, - ) - if err != nil { - logrus.WithFields(logrus.Fields{ - "event_id": out[i].EventID(), - }).WithError(err).Warnf("Failed to add transaction ID to event") - } - } - } - } - return out -} - -// There may be some overlap where events in stateEvents are already in recentEvents, so filter -// them out so we don't include them twice in the /sync response. They should be in recentEvents -// only, so clients get to the correct state once they have rolled forward. -func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.HeaderedEvent) []gomatrixserverlib.HeaderedEvent { - for _, recentEv := range recentEvents { - if recentEv.StateKey() == nil { - continue // not a state event - } - // TODO: This is a linear scan over all the current state events in this room. This will - // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) - // then do a binary search to find matching events, similar to what roomserver does. - for j := 0; j < len(stateEvents); j++ { - if stateEvents[j].EventID() == recentEv.EventID() { - // overwrite the element to remove with the last element then pop the last element. - // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering - // (we don't care about the order of stateEvents) - stateEvents[j] = stateEvents[len(stateEvents)-1] - stateEvents = stateEvents[:len(stateEvents)-1] - break // there shouldn't be multiple events with the same event ID - } - } - } - return stateEvents -} - -// getMembershipFromEvent returns the value of content.membership iff the event is a state event -// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. -func getMembershipFromEvent(ev *gomatrixserverlib.HeaderedEvent, userID string) string { - if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { - membership, err := ev.Membership() - if err != nil { - return "" - } - return membership - } - return "" -} diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index 9574f37b..76a4b7a4 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -28,14 +28,14 @@ import ( func NewSyncServerDatasource(dataSourceName string, dbProperties common.DbProperties) (Database, error) { uri, err := url.Parse(dataSourceName) if err != nil { - return postgres.NewSyncServerDatasource(dataSourceName, dbProperties) + return postgres.NewDatabase(dataSourceName, dbProperties) } switch uri.Scheme { case "postgres": - return postgres.NewSyncServerDatasource(dataSourceName, dbProperties) + return postgres.NewDatabase(dataSourceName, dbProperties) case "file": - return sqlite3.NewSyncServerDatasource(dataSourceName) + return sqlite3.NewDatabase(dataSourceName) default: - return postgres.NewSyncServerDatasource(dataSourceName, dbProperties) + return postgres.NewDatabase(dataSourceName, dbProperties) } } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index f7fa1a87..bffcfd05 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -51,7 +51,7 @@ func MustCreateEvent(t *testing.T, roomID string, prevs []gomatrixserverlib.Head } func MustCreateDatabase(t *testing.T) storage.Database { - db, err := sqlite3.NewSyncServerDatasource("file::memory:") + db, err := sqlite3.NewDatabase("file::memory:") if err != nil { t.Fatalf("NewSyncServerDatasource returned %s", err) } diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go index 84bd9df9..666179af 100644 --- a/syncapi/storage/storage_wasm.go +++ b/syncapi/storage/storage_wasm.go @@ -35,7 +35,7 @@ func NewSyncServerDatasource( case "postgres": return nil, fmt.Errorf("Cannot use postgres implementation") case "file": - return sqlite3.NewSyncServerDatasource(dataSourceName) + return sqlite3.NewDatabase(dataSourceName) default: return nil, fmt.Errorf("Cannot use postgres implementation") } diff --git a/syncapi/storage/tables/backward_extremities.go b/syncapi/storage/tables/backward_extremities.go deleted file mode 100644 index babd9aaa..00000000 --- a/syncapi/storage/tables/backward_extremities.go +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tables - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/common" -) - -// BackwardsExtremitiesStatements contains the SQL statements to implement. -// See BackwardsExtremities to see the parameter and response types. -type BackwardsExtremitiesStatements interface { - Schema() string - InsertBackwardExtremity() string - SelectBackwardExtremitiesForRoom() string - DeleteBackwardExtremity() string -} - -type PostgresBackwardsExtremitiesStatements struct{} - -func (s *PostgresBackwardsExtremitiesStatements) Schema() string { - return `-- Stores output room events received from the roomserver. - CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( - -- The 'room_id' key for the event. - room_id TEXT NOT NULL, - -- The event ID for the last known event. This is the backwards extremity. - event_id TEXT NOT NULL, - -- The prev_events for the last known event. This is used to update extremities. - prev_event_id TEXT NOT NULL, - - PRIMARY KEY(room_id, event_id, prev_event_id) - ); - ` -} -func (s *PostgresBackwardsExtremitiesStatements) InsertBackwardExtremity() string { - return "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT DO NOTHING" -} -func (s *PostgresBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string { - return "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -} -func (s *PostgresBackwardsExtremitiesStatements) DeleteBackwardExtremity() string { - return "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" -} - -type SqliteBackwardsExtremitiesStatements struct{} - -func (s *SqliteBackwardsExtremitiesStatements) Schema() string { - return `-- Stores output room events received from the roomserver. -CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( - -- The 'room_id' key for the event. - room_id TEXT NOT NULL, - -- The event ID for the last known event. This is the backwards extremity. - event_id TEXT NOT NULL, - -- The prev_events for the last known event. This is used to update extremities. - prev_event_id TEXT NOT NULL, - - PRIMARY KEY(room_id, event_id, prev_event_id) -); -` -} - -func (s *SqliteBackwardsExtremitiesStatements) InsertBackwardExtremity() string { - return "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" -} - -func (s *SqliteBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string { - return "" + - "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" -} - -func (s *SqliteBackwardsExtremitiesStatements) DeleteBackwardExtremity() string { - return "" + - "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" -} - -// BackwardsExtremities keeps track of backwards extremities for a room. -// Backwards extremities are the earliest (DAG-wise) known events which we have -// the entire event JSON. These event IDs are used in federation requests to fetch -// even earlier events. -// -// We persist the previous event IDs as well, one per row, so when we do fetch even -// earlier events we can simply delete rows which referenced it. Consider the graph: -// A -// | Event C has 1 prev_event ID: A. -// B C -// |___| Event D has 2 prev_event IDs: B and C. -// | -// D -// The earliest known event we have is D, so this table has 2 rows. -// A backfill request gives us C but not B. We delete rows where prev_event=C. This -// still means that D is a backwards extremity as we do not have event B. However, event -// C is *also* a backwards extremity at this point as we do not have event A. Later, -// when we fetch event B, we delete rows where prev_event=B. This then removes D as -// a backwards extremity because there are no more rows with event_id=B. -type BackwardsExtremities struct { - insertBackwardExtremityStmt *sql.Stmt - selectBackwardExtremitiesForRoomStmt *sql.Stmt - deleteBackwardExtremityStmt *sql.Stmt -} - -// NewBackwardsExtremities prepares the table -func NewBackwardsExtremities(db *sql.DB, stmts BackwardsExtremitiesStatements) (table BackwardsExtremities, err error) { - _, err = db.Exec(stmts.Schema()) - if err != nil { - return - } - if table.insertBackwardExtremityStmt, err = db.Prepare(stmts.InsertBackwardExtremity()); err != nil { - return - } - if table.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(stmts.SelectBackwardExtremitiesForRoom()); err != nil { - return - } - if table.deleteBackwardExtremityStmt, err = db.Prepare(stmts.DeleteBackwardExtremity()); err != nil { - return - } - return -} - -// InsertsBackwardExtremity inserts a new backwards extremity. -func (s *BackwardsExtremities) InsertsBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, -) (err error) { - _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) - return -} - -// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room. -func (s *BackwardsExtremities) SelectBackwardExtremitiesForRoom( - ctx context.Context, roomID string, -) (eventIDs []string, err error) { - rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) - if err != nil { - return - } - defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") - - for rows.Next() { - var eID string - if err = rows.Scan(&eID); err != nil { - return - } - - eventIDs = append(eventIDs, eID) - } - - return eventIDs, rows.Err() -} - -// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed. -func (s *BackwardsExtremities) DeleteBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, knownEventID string, -) (err error) { - _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) - return -} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ffd50216..8f0b8b89 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -30,3 +30,61 @@ type Events interface { SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) } + +type Topology interface { + // InsertEventInTopology inserts the given event in the room's topology, based + // on the event's depth. + InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error) + // SelectEventIDsInRange selects the IDs of events which positions are within a + // given range in a given room's topological order. + // Returns an empty slice if no events match the given range. + SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error) + // SelectPositionInTopology returns the position of a given event in the + // topology of the room it belongs to. + SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos, spos types.StreamPosition, err error) + SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (pos types.StreamPosition, spos types.StreamPosition, err error) + // SelectEventIDsFromPosition returns the IDs of all events that have a given + // position in the topology of a given room. + SelectEventIDsFromPosition(ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition) (eventIDs []string, err error) +} + +type CurrentRoomState interface { + SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) + SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) + UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error + DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error + // SelectCurrentState returns all the current state events for the given room. + SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]gomatrixserverlib.HeaderedEvent, error) + // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. + SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) + // SelectJoinedUsers returns a map of room ID to a list of joined user IDs. + SelectJoinedUsers(ctx context.Context) (map[string][]string, error) +} + +// BackwardsExtremities keeps track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. +type BackwardsExtremities interface { + // InsertsBackwardExtremity inserts a new backwards extremity. + InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error) + // SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room. + SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (eventIDs []string, err error) + // DeleteBackwardExtremity removes a backwards extremity for a room, if one existed. + DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error) +}