diff --git a/syncapi/storage/postgres/backward_extremities_table.go b/syncapi/storage/postgres/backward_extremities_table.go deleted file mode 100644 index cb362964..00000000 --- a/syncapi/storage/postgres/backward_extremities_table.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2018 New Vector Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgres - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/common" -) - -// The purpose of this table is to keep track of backwards extremities for a room. -// Backwards extremities are the earliest (DAG-wise) known events which we have -// the entire event JSON. These event IDs are used in federation requests to fetch -// even earlier events. -// -// We persist the previous event IDs as well, one per row, so when we do fetch even -// earlier events we can simply delete rows which referenced it. Consider the graph: -// A -// | Event C has 1 prev_event ID: A. -// B C -// |___| Event D has 2 prev_event IDs: B and C. -// | -// D -// The earliest known event we have is D, so this table has 2 rows. -// A backfill request gives us C but not B. We delete rows where prev_event=C. This -// still means that D is a backwards extremity as we do not have event B. However, event -// C is *also* a backwards extremity at this point as we do not have event A. Later, -// when we fetch event B, we delete rows where prev_event=B. This then removes D as -// a backwards extremity because there are no more rows with event_id=B. -const backwardExtremitiesSchema = ` --- Stores output room events received from the roomserver. -CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( - -- The 'room_id' key for the event. - room_id TEXT NOT NULL, - -- The event ID for the last known event. This is the backwards extremity. - event_id TEXT NOT NULL, - -- The prev_events for the last known event. This is used to update extremities. - prev_event_id TEXT NOT NULL, - - PRIMARY KEY(room_id, event_id, prev_event_id) -); -` - -const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT DO NOTHING" - -const selectBackwardExtremitiesForRoomSQL = "" + - "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" - -const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" - -type backwardExtremitiesStatements struct { - insertBackwardExtremityStmt *sql.Stmt - selectBackwardExtremitiesForRoomStmt *sql.Stmt - deleteBackwardExtremityStmt *sql.Stmt -} - -func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(backwardExtremitiesSchema) - if err != nil { - return - } - if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil { - return - } - if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { - return - } - if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { - return - } - return -} - -func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, -) (err error) { - _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) - return -} - -func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( - ctx context.Context, roomID string, -) (eventIDs []string, err error) { - rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) - if err != nil { - return - } - defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") - - for rows.Next() { - var eID string - if err = rows.Scan(&eID); err != nil { - return - } - - eventIDs = append(eventIDs, eID) - } - - return eventIDs, rows.Err() -} - -func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, knownEventID string, -) (err error) { - _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) - return -} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 1e078ef4..9d61ccfc 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -32,6 +32,7 @@ import ( _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -56,7 +57,7 @@ type SyncServerDatasource struct { invites inviteEventsStatements eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements - backwardExtremities backwardExtremitiesStatements + backwardExtremities tables.BackwardsExtremities } // NewSyncServerDatasource creates a new sync server database @@ -75,16 +76,17 @@ func NewSyncServerDatasource(dbDataSourceName string) (*SyncServerDatasource, er if err = d.events.prepare(d.db); err != nil { return nil, err } - if err := d.roomstate.prepare(d.db); err != nil { + if err = d.roomstate.prepare(d.db); err != nil { return nil, err } - if err := d.invites.prepare(d.db); err != nil { + if err = d.invites.prepare(d.db); err != nil { return nil, err } - if err := d.topology.prepare(d.db); err != nil { + if err = d.topology.prepare(d.db); err != nil { return nil, err } - if err := d.backwardExtremities.prepare(d.db); err != nil { + d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.PostgresBackwardsExtremitiesStatements{}) + if err != nil { return nil, err } d.eduCache = cache.New() @@ -116,7 +118,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { - if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } @@ -137,7 +139,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // If the event is missing, consider it a backward extremity. if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { + if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { return err } } @@ -314,7 +316,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.Paginati func (d *SyncServerDatasource) BackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (backwardExtremities []string, err error) { - return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID) + return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) } // MaxTopologicalPosition returns the highest topological position for a given diff --git a/syncapi/storage/sqlite3/backward_extremities_table.go b/syncapi/storage/sqlite3/backward_extremities_table.go deleted file mode 100644 index 3d8cb91f..00000000 --- a/syncapi/storage/sqlite3/backward_extremities_table.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2018 New Vector Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/common" -) - -// The purpose of this table is to keep track of backwards extremities for a room. -// Backwards extremities are the earliest (DAG-wise) known events which we have -// the entire event JSON. These event IDs are used in federation requests to fetch -// even earlier events. -// -// We persist the previous event IDs as well, one per row, so when we do fetch even -// earlier events we can simply delete rows which referenced it. Consider the graph: -// A -// | Event C has 1 prev_event ID: A. -// B C -// |___| Event D has 2 prev_event IDs: B and C. -// | -// D -// The earliest known event we have is D, so this table has 2 rows. -// A backfill request gives us C but not B. We delete rows where prev_event=C. This -// still means that D is a backwards extremity as we do not have event B. However, event -// C is *also* a backwards extremity at this point as we do not have event A. Later, -// when we fetch event B, we delete rows where prev_event=B. This then removes D as -// a backwards extremity because there are no more rows with event_id=B. -const backwardExtremitiesSchema = ` --- Stores output room events received from the roomserver. -CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( - -- The 'room_id' key for the event. - room_id TEXT NOT NULL, - -- The event ID for the last known event. This is the backwards extremity. - event_id TEXT NOT NULL, - -- The prev_events for the last known event. This is used to update extremities. - prev_event_id TEXT NOT NULL, - - PRIMARY KEY(room_id, event_id, prev_event_id) -); -` - -const insertBackwardExtremitySQL = "" + - "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" - -const selectBackwardExtremitiesForRoomSQL = "" + - "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" - -const deleteBackwardExtremitySQL = "" + - "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" - -type backwardExtremitiesStatements struct { - insertBackwardExtremityStmt *sql.Stmt - selectBackwardExtremitiesForRoomStmt *sql.Stmt - deleteBackwardExtremityStmt *sql.Stmt -} - -func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(backwardExtremitiesSchema) - if err != nil { - return - } - if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil { - return - } - if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { - return - } - if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { - return - } - return -} - -func (s *backwardExtremitiesStatements) insertsBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, -) (err error) { - _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) - return -} - -func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( - ctx context.Context, roomID string, -) (eventIDs []string, err error) { - rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) - if err != nil { - return - } - defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") - - for rows.Next() { - var eID string - if err = rows.Scan(&eID); err != nil { - return - } - - eventIDs = append(eventIDs, eID) - } - - return eventIDs, rows.Err() -} - -func (s *backwardExtremitiesStatements) deleteBackwardExtremity( - ctx context.Context, txn *sql.Tx, roomID, knownEventID string, -) (err error) { - _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) - return -} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index cdfd29b8..35774830 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -35,6 +35,7 @@ import ( "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -60,7 +61,7 @@ type SyncServerDatasource struct { invites inviteEventsStatements eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements - backwardExtremities backwardExtremitiesStatements + backwardExtremities tables.BackwardsExtremities } // NewSyncServerDatasource creates a new sync server database @@ -102,16 +103,17 @@ func (d *SyncServerDatasource) prepare() (err error) { if err = d.events.prepare(d.db, &d.streamID); err != nil { return err } - if err := d.roomstate.prepare(d.db, &d.streamID); err != nil { + if err = d.roomstate.prepare(d.db, &d.streamID); err != nil { return err } - if err := d.invites.prepare(d.db, &d.streamID); err != nil { + if err = d.invites.prepare(d.db, &d.streamID); err != nil { return err } - if err := d.topology.prepare(d.db); err != nil { + if err = d.topology.prepare(d.db); err != nil { return err } - if err := d.backwardExtremities.prepare(d.db); err != nil { + d.backwardExtremities, err = tables.NewBackwardsExtremities(d.db, &tables.SqliteBackwardsExtremitiesStatements{}) + if err != nil { return err } return nil @@ -142,7 +144,7 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([ // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error { - if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { + if err := d.backwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil { return err } @@ -163,7 +165,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // If the event is missing, consider it a backward extremity. if !found { - if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { + if err = d.backwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil { return err } } @@ -344,7 +346,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi func (d *SyncServerDatasource) BackwardExtremitiesForRoom( ctx context.Context, roomID string, ) (backwardExtremities []string, err error) { - return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID) + return d.backwardExtremities.SelectBackwardExtremitiesForRoom(ctx, roomID) } // MaxTopologicalPosition returns the highest topological position for a given diff --git a/syncapi/storage/tables/backward_extremities.go b/syncapi/storage/tables/backward_extremities.go new file mode 100644 index 00000000..babd9aaa --- /dev/null +++ b/syncapi/storage/tables/backward_extremities.go @@ -0,0 +1,175 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tables + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" +) + +// BackwardsExtremitiesStatements contains the SQL statements to implement. +// See BackwardsExtremities to see the parameter and response types. +type BackwardsExtremitiesStatements interface { + Schema() string + InsertBackwardExtremity() string + SelectBackwardExtremitiesForRoom() string + DeleteBackwardExtremity() string +} + +type PostgresBackwardsExtremitiesStatements struct{} + +func (s *PostgresBackwardsExtremitiesStatements) Schema() string { + return `-- Stores output room events received from the roomserver. + CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The event ID for the last known event. This is the backwards extremity. + event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, + + PRIMARY KEY(room_id, event_id, prev_event_id) + ); + ` +} +func (s *PostgresBackwardsExtremitiesStatements) InsertBackwardExtremity() string { + return "" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT DO NOTHING" +} +func (s *PostgresBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string { + return "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" +} +func (s *PostgresBackwardsExtremitiesStatements) DeleteBackwardExtremity() string { + return "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" +} + +type SqliteBackwardsExtremitiesStatements struct{} + +func (s *SqliteBackwardsExtremitiesStatements) Schema() string { + return `-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The event ID for the last known event. This is the backwards extremity. + event_id TEXT NOT NULL, + -- The prev_events for the last known event. This is used to update extremities. + prev_event_id TEXT NOT NULL, + + PRIMARY KEY(room_id, event_id, prev_event_id) +); +` +} + +func (s *SqliteBackwardsExtremitiesStatements) InsertBackwardExtremity() string { + return "" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" + + " VALUES ($1, $2, $3)" + + " ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING" +} + +func (s *SqliteBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string { + return "" + + "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1" +} + +func (s *SqliteBackwardsExtremitiesStatements) DeleteBackwardExtremity() string { + return "" + + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" +} + +// BackwardsExtremities keeps track of backwards extremities for a room. +// Backwards extremities are the earliest (DAG-wise) known events which we have +// the entire event JSON. These event IDs are used in federation requests to fetch +// even earlier events. +// +// We persist the previous event IDs as well, one per row, so when we do fetch even +// earlier events we can simply delete rows which referenced it. Consider the graph: +// A +// | Event C has 1 prev_event ID: A. +// B C +// |___| Event D has 2 prev_event IDs: B and C. +// | +// D +// The earliest known event we have is D, so this table has 2 rows. +// A backfill request gives us C but not B. We delete rows where prev_event=C. This +// still means that D is a backwards extremity as we do not have event B. However, event +// C is *also* a backwards extremity at this point as we do not have event A. Later, +// when we fetch event B, we delete rows where prev_event=B. This then removes D as +// a backwards extremity because there are no more rows with event_id=B. +type BackwardsExtremities struct { + insertBackwardExtremityStmt *sql.Stmt + selectBackwardExtremitiesForRoomStmt *sql.Stmt + deleteBackwardExtremityStmt *sql.Stmt +} + +// NewBackwardsExtremities prepares the table +func NewBackwardsExtremities(db *sql.DB, stmts BackwardsExtremitiesStatements) (table BackwardsExtremities, err error) { + _, err = db.Exec(stmts.Schema()) + if err != nil { + return + } + if table.insertBackwardExtremityStmt, err = db.Prepare(stmts.InsertBackwardExtremity()); err != nil { + return + } + if table.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(stmts.SelectBackwardExtremitiesForRoom()); err != nil { + return + } + if table.deleteBackwardExtremityStmt, err = db.Prepare(stmts.DeleteBackwardExtremity()); err != nil { + return + } + return +} + +// InsertsBackwardExtremity inserts a new backwards extremity. +func (s *BackwardsExtremities) InsertsBackwardExtremity( + ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string, +) (err error) { + _, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID) + return +} + +// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room. +func (s *BackwardsExtremities) SelectBackwardExtremitiesForRoom( + ctx context.Context, roomID string, +) (eventIDs []string, err error) { + rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) + if err != nil { + return + } + defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed") + + for rows.Next() { + var eID string + if err = rows.Scan(&eID); err != nil { + return + } + + eventIDs = append(eventIDs, eID) + } + + return eventIDs, rows.Err() +} + +// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed. +func (s *BackwardsExtremities) DeleteBackwardExtremity( + ctx context.Context, txn *sql.Tx, roomID, knownEventID string, +) (err error) { + _, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID) + return +}