diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index 66316ac4..0158c8f7 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" + "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -92,7 +93,7 @@ func (r *RoomserverInternalAPI) updateLatestEvents( type latestEventsUpdater struct { ctx context.Context api *RoomserverInternalAPI - updater types.RoomRecentEventsUpdater + updater *shared.LatestEventsUpdater roomNID types.RoomNID stateAtEvent types.StateAtEvent event gomatrixserverlib.Event diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input_membership.go index af0c7f8b..bcecfca0 100644 --- a/roomserver/internal/input_membership.go +++ b/roomserver/internal/input_membership.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -29,7 +30,7 @@ import ( // consumers about the invites added or retired by the change in current state. func (r *RoomserverInternalAPI) updateMemberships( ctx context.Context, - updater types.RoomRecentEventsUpdater, + updater *shared.LatestEventsUpdater, removed, added []types.StateEntry, ) ([]api.OutputEvent, error) { changes := membershipChanges(removed, added) @@ -77,7 +78,7 @@ func (r *RoomserverInternalAPI) updateMemberships( } func (r *RoomserverInternalAPI) updateMembership( - updater types.RoomRecentEventsUpdater, + updater *shared.LatestEventsUpdater, targetUserNID types.EventStateKeyNID, remove, add *gomatrixserverlib.Event, updates []api.OutputEvent, @@ -141,7 +142,7 @@ func (r *RoomserverInternalAPI) isLocalTarget(event *gomatrixserverlib.Event) bo } func updateToInviteMembership( - mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, + mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, roomVersion gomatrixserverlib.RoomVersion, ) ([]api.OutputEvent, error) { // We may have already sent the invite to the user, either because we are @@ -171,7 +172,7 @@ func updateToInviteMembership( } func updateToJoinMembership( - mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, + mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { // If the user is already marked as being joined, we call SetToJoin to update // the event ID then we can return immediately. Retired is ignored as there @@ -207,7 +208,7 @@ func updateToJoinMembership( } func updateToLeaveMembership( - mu types.MembershipUpdater, add *gomatrixserverlib.Event, + mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, newMembership string, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { // If the user is already neither joined, nor invited to the room then we diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index afe5bcb1..988fc908 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -18,6 +18,7 @@ import ( "context" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -86,7 +87,7 @@ type Database interface { // The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error. // Returns the latest events in the room and the last eventID sent to the log along with an updater. // If this returns an error then no further action is required. - GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error) + GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (*shared.LatestEventsUpdater, error) // Look up event ID by transaction's info. // This is used to determine if the room event is processed/processing already. // Returns an empty string if no such event exists. @@ -123,7 +124,7 @@ type Database interface { // Returns an error if there was a problem talking to the database. RemoveRoomAlias(ctx context.Context, alias string) error // Build a membership updater for the target user in a room. - MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (types.MembershipUpdater, error) + MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (*shared.MembershipUpdater, error) // Lookup the membership of a given user in a given room. // Returns the numeric ID of the latest membership event sent from this user // in this room, along a boolean set to true if the user is still in this room, diff --git a/roomserver/storage/shared/room_recent_events_updater.go b/roomserver/storage/shared/latest_events_updater.go similarity index 68% rename from roomserver/storage/shared/room_recent_events_updater.go rename to roomserver/storage/shared/latest_events_updater.go index 8131f712..21b168a4 100644 --- a/roomserver/storage/shared/room_recent_events_updater.go +++ b/roomserver/storage/shared/latest_events_updater.go @@ -8,7 +8,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -type roomRecentEventsUpdater struct { +type LatestEventsUpdater struct { transaction d *Database roomNID types.RoomNID @@ -17,11 +17,7 @@ type roomRecentEventsUpdater struct { currentStateSnapshotNID types.StateSnapshotNID } -func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID, useTxns bool) (types.RoomRecentEventsUpdater, error) { - txn, err := d.DB.Begin() - if err != nil { - return nil, err - } +func NewLatestEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomNID types.RoomNID) (*LatestEventsUpdater, error) { eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomNID) if err != nil { @@ -41,38 +37,34 @@ func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types. return nil, err } } - if !useTxns { - txn.Commit() // nolint: errcheck - txn = nil - } - return &roomRecentEventsUpdater{ + return &LatestEventsUpdater{ transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID, }, nil } // RoomVersion implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) { +func (u *LatestEventsUpdater) RoomVersion() (version gomatrixserverlib.RoomVersion) { version, _ = u.d.GetRoomVersionForRoomNID(u.ctx, u.roomNID) return } // LatestEvents implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) LatestEvents() []types.StateAtEventAndReference { +func (u *LatestEventsUpdater) LatestEvents() []types.StateAtEventAndReference { return u.latestEvents } // LastEventIDSent implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) LastEventIDSent() string { +func (u *LatestEventsUpdater) LastEventIDSent() string { return u.lastEventIDSent } // CurrentStateSnapshotNID implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID { +func (u *LatestEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID { return u.currentStateSnapshotNID } // StorePreviousEvents implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { +func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { for _, ref := range previousEventReferences { if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { return err @@ -82,7 +74,7 @@ func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, p } // IsReferenced implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) { +func (u *LatestEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) { err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256) if err == nil { return true, nil @@ -94,7 +86,7 @@ func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib. } // SetLatestEvents implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) SetLatestEvents( +func (u *LatestEventsUpdater) SetLatestEvents( roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID, currentStateSnapshotNID types.StateSnapshotNID, ) error { @@ -106,15 +98,15 @@ func (u *roomRecentEventsUpdater) SetLatestEvents( } // HasEventBeenSent implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) { +func (u *LatestEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) { return u.d.EventsTable.SelectEventSentToOutput(u.ctx, u.txn, eventNID) } // MarkEventAsSent implements types.RoomRecentEventsUpdater -func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error { +func (u *LatestEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error { return u.d.EventsTable.UpdateEventSentToOutput(u.ctx, u.txn, eventNID) } -func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (types.MembershipUpdater, error) { +func (u *LatestEventsUpdater) MembershipUpdater(targetUserNID types.EventStateKeyNID, targetLocal bool) (*MembershipUpdater, error) { return u.d.membershipUpdaterTxn(u.ctx, u.txn, u.roomNID, targetUserNID, targetLocal) } diff --git a/roomserver/storage/shared/membership_updater.go b/roomserver/storage/shared/membership_updater.go index 5ddf6d84..5955844f 100644 --- a/roomserver/storage/shared/membership_updater.go +++ b/roomserver/storage/shared/membership_updater.go @@ -9,7 +9,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -type membershipUpdater struct { +type MembershipUpdater struct { transaction d *Database roomNID types.RoomNID @@ -18,21 +18,9 @@ type membershipUpdater struct { } func NewMembershipUpdater( - ctx context.Context, d *Database, roomID, targetUserID string, + ctx context.Context, d *Database, txn *sql.Tx, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, - useTxns bool, -) (types.MembershipUpdater, error) { - txn, err := d.DB.Begin() - if err != nil { - return nil, err - } - succeeded := false - defer func() { - if !succeeded { - txn.Rollback() // nolint: errcheck - } - }() - +) (*MembershipUpdater, error) { roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion) if err != nil { return nil, err @@ -43,17 +31,7 @@ func NewMembershipUpdater( return nil, err } - updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) - if err != nil { - return nil, err - } - - succeeded = true - if !useTxns { - txn.Commit() // nolint: errcheck - updater.transaction.txn = nil - } - return updater, nil + return d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) } func (d *Database) membershipUpdaterTxn( @@ -62,7 +40,7 @@ func (d *Database) membershipUpdaterTxn( roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, targetLocal bool, -) (*membershipUpdater, error) { +) (*MembershipUpdater, error) { if err := d.MembershipTable.InsertMembership(ctx, txn, roomNID, targetUserNID, targetLocal); err != nil { return nil, err @@ -73,28 +51,28 @@ func (d *Database) membershipUpdaterTxn( return nil, err } - return &membershipUpdater{ + return &MembershipUpdater{ transaction{ctx, txn}, d, roomNID, targetUserNID, membership, }, nil } // IsInvite implements types.MembershipUpdater -func (u *membershipUpdater) IsInvite() bool { +func (u *MembershipUpdater) IsInvite() bool { return u.membership == tables.MembershipStateInvite } // IsJoin implements types.MembershipUpdater -func (u *membershipUpdater) IsJoin() bool { +func (u *MembershipUpdater) IsJoin() bool { return u.membership == tables.MembershipStateJoin } // IsLeave implements types.MembershipUpdater -func (u *membershipUpdater) IsLeave() bool { +func (u *MembershipUpdater) IsLeave() bool { return u.membership == tables.MembershipStateLeaveOrBan } // SetToInvite implements types.MembershipUpdater -func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) { +func (u *MembershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, error) { senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, event.Sender()) if err != nil { return false, err @@ -116,7 +94,7 @@ func (u *membershipUpdater) SetToInvite(event gomatrixserverlib.Event) (bool, er } // SetToJoin implements types.MembershipUpdater -func (u *membershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) { +func (u *MembershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) { var inviteEventIDs []string senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID) @@ -153,7 +131,7 @@ func (u *membershipUpdater) SetToJoin(senderUserID string, eventID string, isUpd } // SetToLeave implements types.MembershipUpdater -func (u *membershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) { +func (u *MembershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) { senderUserNID, err := u.d.assignStateKeyNID(u.ctx, u.txn, senderUserID) if err != nil { return nil, err diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 5494e465..00179e33 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -332,14 +332,22 @@ func (d *Database) GetTransactionEventID( func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, -) (types.MembershipUpdater, error) { - return NewMembershipUpdater(ctx, d, roomID, targetUserID, targetLocal, roomVersion, true) +) (*MembershipUpdater, error) { + txn, err := d.DB.Begin() + if err != nil { + return nil, err + } + return NewMembershipUpdater(ctx, d, txn, roomID, targetUserID, targetLocal, roomVersion) } func (d *Database) GetLatestEventsForUpdate( ctx context.Context, roomNID types.RoomNID, -) (types.RoomRecentEventsUpdater, error) { - return NewRoomRecentEventsUpdater(d, ctx, roomNID, true) +) (*LatestEventsUpdater, error) { + txn, err := d.DB.Begin() + if err != nil { + return nil, err + } + return NewLatestEventsUpdater(ctx, d, txn, roomNID) } func (d *Database) StoreEvent( diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 3ac30ca3..0e39755c 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -326,9 +326,13 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference( iEventNIDs[k] = v } selectOrig := strings.Replace(bulkSelectStateAtEventAndReferenceSQL, "($1)", sqlutil.QueryVariadic(len(iEventNIDs)), 1) + selectPrep, err := s.db.Prepare(selectOrig) + if err != nil { + return nil, err + } ////////////// - rows, err := txn.QueryContext(ctx, selectOrig, iEventNIDs...) + rows, err := sqlutil.TxStmt(txn, selectPrep).QueryContext(ctx, iEventNIDs...) if err != nil { return nil, err } @@ -372,7 +376,7 @@ func (s *eventStatements) BulkSelectEventReference( iEventNIDs[k] = v } selectOrig := strings.Replace(bulkSelectEventReferenceSQL, "($1)", sqlutil.QueryVariadic(len(iEventNIDs)), 1) - selectPrep, err := txn.Prepare(selectOrig) + selectPrep, err := s.db.Prepare(selectOrig) if err != nil { return nil, err } @@ -471,7 +475,11 @@ func (s *eventStatements) SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, iEventIDs[i] = v } sqlStr := strings.Replace(selectMaxEventDepthSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1) - err := txn.QueryRowContext(ctx, sqlStr, iEventIDs...).Scan(&result) + sqlPrep, err := s.db.Prepare(sqlStr) + if err != nil { + return 0, err + } + err = sqlutil.TxStmt(txn, sqlPrep).QueryRowContext(ctx, iEventIDs...).Scan(&result) if err != nil { return 0, err } diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index ae3140d7..72431637 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -139,25 +139,25 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) { func (d *Database) GetLatestEventsForUpdate( ctx context.Context, roomNID types.RoomNID, -) (types.RoomRecentEventsUpdater, error) { +) (*shared.LatestEventsUpdater, error) { // TODO: Do not use transactions. We should be holding open this transaction but we cannot have // multiple write transactions on sqlite. The code will perform additional // write transactions independent of this one which will consistently cause // 'database is locked' errors. As sqlite doesn't support multi-process on the // same DB anyway, and we only execute updates sequentially, the only worries // are for rolling back when things go wrong. (atomicity) - return shared.NewRoomRecentEventsUpdater(&d.Database, ctx, roomNID, false) + return shared.NewLatestEventsUpdater(ctx, &d.Database, nil, roomNID) } func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, -) (updater types.MembershipUpdater, err error) { +) (*shared.MembershipUpdater, error) { // TODO: Do not use transactions. We should be holding open this transaction but we cannot have // multiple write transactions on sqlite. The code will perform additional // write transactions independent of this one which will consistently cause // 'database is locked' errors. As sqlite doesn't support multi-process on the // same DB anyway, and we only execute updates sequentially, the only worries // are for rolling back when things go wrong. (atomicity) - return shared.NewMembershipUpdater(ctx, &d.Database, roomID, targetUserID, targetLocal, roomVersion, false) + return shared.NewMembershipUpdater(ctx, &d.Database, nil, roomID, targetUserID, targetLocal, roomVersion) } diff --git a/roomserver/types/types.go b/roomserver/types/types.go index 241e1e15..cf4a86b6 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -16,7 +16,6 @@ package types import ( - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" ) @@ -140,68 +139,6 @@ type StateEntryList struct { StateEntries []StateEntry } -// A RoomRecentEventsUpdater is used to update the recent events in a room. -// (On postgresql this wraps a database transaction that holds a "FOR UPDATE" -// lock on the row in the rooms table holding the latest events for the room.) -type RoomRecentEventsUpdater interface { - // The room version of the room. - RoomVersion() gomatrixserverlib.RoomVersion - // The latest event IDs and state in the room. - LatestEvents() []StateAtEventAndReference - // The event ID of the latest event written to the output log in the room. - LastEventIDSent() string - // The current state of the room. - CurrentStateSnapshotNID() StateSnapshotNID - // Store the previous events referenced by an event. - // This adds the event NID to an entry in the database for each of the previous events. - // If there isn't an entry for one of previous events then an entry is created. - // If the entry already lists the event NID as a referrer then the entry unmodified. - // (i.e. the operation is idempotent) - StorePreviousEvents(eventNID EventNID, previousEventReferences []gomatrixserverlib.EventReference) error - // Check whether the eventReference is already referenced by another matrix event. - IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) - // Set the list of latest events for the room. - // This replaces the current list stored in the database with the given list - SetLatestEvents( - roomNID RoomNID, latest []StateAtEventAndReference, lastEventNIDSent EventNID, - currentStateSnapshotNID StateSnapshotNID, - ) error - // Check if the event has already be written to the output logs. - HasEventBeenSent(eventNID EventNID) (bool, error) - // Mark the event as having been sent to the output logs. - MarkEventAsSent(eventNID EventNID) error - // Build a membership updater for the target user in this room. - // It will share the same transaction as this updater. - MembershipUpdater(targetUserNID EventStateKeyNID, isTargetLocalUser bool) (MembershipUpdater, error) - // Implements Transaction so it can be committed or rolledback - sqlutil.Transaction -} - -// A MembershipUpdater is used to update the membership of a user in a room. -// (On postgresql this wraps a database transaction that holds a "FOR UPDATE" -// lock on the row in the membership table for this user in the room) -// The caller should call one of SetToInvite, SetToJoin or SetToLeave once to -// make the update, or none of them if no update is required. -type MembershipUpdater interface { - // True if the target user is invited to the room before updating. - IsInvite() bool - // True if the target user is joined to the room before updating. - IsJoin() bool - // True if the target user is not invited or joined to the room before updating. - IsLeave() bool - // Set the state to invite. - // Returns whether this invite needs to be sent - SetToInvite(event gomatrixserverlib.Event) (needsSending bool, err error) - // Set the state to join or updates the event ID in the database. - // Returns a list of invite event IDs that this state change retired. - SetToJoin(senderUserID string, eventID string, isUpdate bool) (inviteEventIDs []string, err error) - // Set the state to leave. - // Returns a list of invite event IDs that this state change retired. - SetToLeave(senderUserID string, eventID string) (inviteEventIDs []string, err error) - // Implements Transaction so it can be committed or rolledback. - sqlutil.Transaction -} - // A MissingEventError is an error that happened because the roomserver was // missing requested events from its database. type MissingEventError string