Track which events are the latest events in the room. (#14)
* Track which events are the latest events in the room.
This commit is contained in:
parent
966d3b95b7
commit
51f9741b34
8 changed files with 368 additions and 10 deletions
|
@ -34,6 +34,10 @@ type RoomEventDatabase interface {
|
|||
AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
|
||||
// Set the state at an event.
|
||||
SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error
|
||||
// Lookup the latest events in a room in preparation for an update.
|
||||
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
|
||||
// If this returns an error then no further action is required.
|
||||
GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error)
|
||||
}
|
||||
|
||||
func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
||||
|
@ -85,10 +89,12 @@ func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
|||
db.SetState(stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
|
||||
}
|
||||
|
||||
// Update the extremities of the event graph for the room
|
||||
if err := updateLatestEvents(db, roomNID, stateAtEvent, event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// * Calcuate the state at the event if necessary.
|
||||
// * Store the state at the event.
|
||||
// * Update the extremities of the event graph for the room
|
||||
// * Caculate the new current state for the room if the forward extremities have changed.
|
||||
// * Work out the delta between the new current state and the previous current state.
|
||||
// * Work out the visibility of the event.
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
package input
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// updateLatestEvents updates the list of latest events for this room.
|
||||
// The latest events are the events that aren't referenced by another event in the database:
|
||||
//
|
||||
// Time goes down the page. 1 is the m.room.create event (root).
|
||||
//
|
||||
// 1 After storing 1 the latest events are {1}
|
||||
// | After storing 2 the latest events are {2}
|
||||
// 2 After storing 3 the latest events are {3}
|
||||
// / \ After storing 4 the latest events are {3,4}
|
||||
// 3 4 After storing 5 the latest events are {5,4}
|
||||
// | | After storing 6 the latest events are {5,6}
|
||||
// 5 6 <--- latest After storing 7 the latest events are {6,7}
|
||||
// |
|
||||
// 7 <----- latest
|
||||
//
|
||||
func updateLatestEvents(
|
||||
db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
) (err error) {
|
||||
oldLatest, updater, err := db.GetLatestEventsForUpdate(roomNID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err == nil {
|
||||
// Commit if there wasn't an error.
|
||||
// Set the returned err value if we encounter an error committing.
|
||||
// This only works because err is a named return.
|
||||
err = updater.Commit()
|
||||
} else {
|
||||
// Ignore any error we get rolling back since we don't want to
|
||||
// clobber the current error
|
||||
// TODO: log the error here.
|
||||
updater.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
err = doUpdateLatestEvents(updater, oldLatest, roomNID, stateAtEvent, event)
|
||||
return
|
||||
}
|
||||
|
||||
func doUpdateLatestEvents(
|
||||
updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||
) error {
|
||||
var err error
|
||||
var prevEvents []gomatrixserverlib.EventReference
|
||||
prevEvents = event.PrevEvents()
|
||||
|
||||
if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if this event references any of the latest events in the room.
|
||||
var newLatest []types.StateAtEventAndReference
|
||||
for _, l := range oldLatest {
|
||||
for _, prevEvent := range prevEvents {
|
||||
if l.EventID == prevEvent.EventID && bytes.Compare(l.EventSHA256, prevEvent.EventSHA256) == 0 {
|
||||
// This event can be removed from the latest events cause we've found an event that references it.
|
||||
// (If an event is referenced by another event then it can't be one of the latest events in the room
|
||||
// because we have an event that comes after it)
|
||||
continue
|
||||
}
|
||||
// Keep the event in the latest events.
|
||||
newLatest = append(newLatest, l)
|
||||
}
|
||||
}
|
||||
|
||||
eventReference := event.EventReference()
|
||||
// Check if this event is already referenced by another event in the room.
|
||||
var alreadyReferenced bool
|
||||
if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !alreadyReferenced {
|
||||
// This event is not referenced by any of the events in the room.
|
||||
// Add it to the latest events
|
||||
newLatest = append(newLatest, types.StateAtEventAndReference{
|
||||
StateAtEvent: stateAtEvent,
|
||||
EventReference: eventReference,
|
||||
})
|
||||
}
|
||||
|
||||
if err = updater.SetLatestEvents(roomNID, newLatest); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -65,11 +65,16 @@ const bulkSelectStateAtEventByIDSQL = "" +
|
|||
const updateEventStateSQL = "" +
|
||||
"UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1"
|
||||
|
||||
const bulkSelectStateAtEventAndReferenceSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
|
||||
" FROM events WHERE event_nid = ANY($1)"
|
||||
|
||||
type eventStatements struct {
|
||||
insertEventStmt *sql.Stmt
|
||||
bulkSelectStateEventByIDStmt *sql.Stmt
|
||||
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
||||
updateEventStateStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
bulkSelectStateEventByIDStmt *sql.Stmt
|
||||
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
||||
updateEventStateStmt *sql.Stmt
|
||||
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *eventStatements) prepare(db *sql.DB) (err error) {
|
||||
|
@ -89,6 +94,9 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.updateEventStateStmt, err = db.Prepare(updateEventStateSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -176,3 +184,43 @@ func (s *eventStatements) updateEventState(eventNID types.EventNID, stateNID typ
|
|||
_, err := s.updateEventStateStmt.Exec(int64(eventNID), int64(stateNID))
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) {
|
||||
nids := make([]int64, len(eventNIDs))
|
||||
for i := range eventNIDs {
|
||||
nids[i] = int64(eventNIDs[i])
|
||||
}
|
||||
rows, err := txn.Stmt(s.bulkSelectStateAtEventAndReferenceStmt).Query(pq.Int64Array(nids))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
results := make([]types.StateAtEventAndReference, len(eventNIDs))
|
||||
i := 0
|
||||
for ; rows.Next(); i++ {
|
||||
var (
|
||||
eventTypeNID int64
|
||||
eventStateKeyNID int64
|
||||
eventNID int64
|
||||
stateSnapshotNID int64
|
||||
eventID string
|
||||
eventSHA256 []byte
|
||||
)
|
||||
if err = rows.Scan(
|
||||
&eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := &results[i]
|
||||
result.EventTypeNID = types.EventTypeNID(eventTypeNID)
|
||||
result.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID)
|
||||
result.EventNID = types.EventNID(eventNID)
|
||||
result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID)
|
||||
result.EventID = eventID
|
||||
result.EventSHA256 = eventSHA256
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
||||
const previousEventSchema = `
|
||||
-- The previous events table stores the event_ids referenced by the events
|
||||
-- stored in the events table.
|
||||
-- This is used to tell if a new event is already referenced by an event in
|
||||
-- the database.
|
||||
CREATE TABLE IF NOT EXISTS previous_events (
|
||||
-- The string event ID taken from the prev_events key of an event.
|
||||
previous_event_id TEXT NOT NULL,
|
||||
-- The SHA256 reference hash taken from the prev_events key of an event.
|
||||
previous_reference_sha256 BYTEA NOT NULL,
|
||||
-- A list of numeric event IDs of events that reference this prev_event.
|
||||
event_nids BIGINT[] NOT NULL,
|
||||
CONSTRAINT previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256)
|
||||
);
|
||||
`
|
||||
|
||||
// Insert an entry into the previous_events table.
|
||||
// If there is already an entry indicating that an event references that previous event then
|
||||
// add the event NID to the list to indicate that this event references that previous event as well.
|
||||
// This should only be modified while holding a "FOR UPDATE" lock on the row in the rooms table for this room.
|
||||
// The lock is necessary to avoid data races when checking whether an event is already referenced by another event.
|
||||
const insertPreviousEventSQL = "" +
|
||||
"INSERT INTO previous_events" +
|
||||
" (previous_event_id, previous_reference_sha256, event_nids)" +
|
||||
" VALUES ($1, $2, array_append('{}'::bigint[], $3))" +
|
||||
" ON CONFLICT ON CONSTRAINT previous_event_id_unique" +
|
||||
" DO UPDATE SET event_nids = array_append(previous_events.event_nids, $3)" +
|
||||
" WHERE $3 != ALL(previous_events.event_nids)"
|
||||
|
||||
// Check if the event is referenced by another event in the table.
|
||||
// This should only be done while holding a "FOR UPDATE" lock on the row in the rooms table for this room.
|
||||
const selectPreviousEventExistsSQL = "" +
|
||||
"SELECT 1 FROM previous_events" +
|
||||
" WHERE previous_event_id = $1 AND previous_reference_sha256 = $2"
|
||||
|
||||
type previousEventStatements struct {
|
||||
insertPreviousEventStmt *sql.Stmt
|
||||
selectPreviousEventExistsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *previousEventStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(previousEventSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertPreviousEventStmt, err = db.Prepare(insertPreviousEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectPreviousEventExistsStmt, err = db.Prepare(selectPreviousEventExistsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *previousEventStatements) insertPreviousEvent(txn *sql.Tx, previousEventID string, previousEventReferenceSHA256 []byte, eventNID types.EventNID) error {
|
||||
_, err := txn.Stmt(s.insertPreviousEventStmt).Exec(previousEventID, previousEventReferenceSHA256, int64(eventNID))
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if the event reference exists
|
||||
// Returns sql.ErrNoRows if the event reference doesn't exist.
|
||||
func (s *previousEventStatements) selectPreviousEventExists(txn *sql.Tx, eventID string, eventReferenceSHA256 []byte) error {
|
||||
var ok int64
|
||||
return txn.Stmt(s.selectPreviousEventExistsStmt).QueryRow(eventID, eventReferenceSHA256).Scan(&ok)
|
||||
}
|
|
@ -2,6 +2,7 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
||||
|
@ -11,7 +12,11 @@ CREATE TABLE IF NOT EXISTS rooms (
|
|||
-- Local numeric ID for the room.
|
||||
room_nid BIGINT PRIMARY KEY DEFAULT nextval('room_nid_seq'),
|
||||
-- Textual ID for the room.
|
||||
room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE
|
||||
room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE,
|
||||
-- The most recent events in the room that aren't referenced by another event.
|
||||
-- This list may empty if the server hasn't joined the room yet.
|
||||
-- (The server will be in that state while it stores the events for the initial state of the room)
|
||||
latest_event_nids BIGINT[] NOT NULL
|
||||
);
|
||||
`
|
||||
|
||||
|
@ -25,9 +30,17 @@ const insertRoomNIDSQL = "" +
|
|||
const selectRoomNIDSQL = "" +
|
||||
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
||||
|
||||
const selectLatestEventNIDsSQL = "" +
|
||||
"SELECT latest_event_nids FROM rooms WHERE room_nid = $1 FOR UPDATE"
|
||||
|
||||
const updateLatestEventNIDsSQL = "" +
|
||||
"UPDATE rooms SET latest_event_nids = $2 WHERE room_nid = $1"
|
||||
|
||||
type roomStatements struct {
|
||||
insertRoomNIDStmt *sql.Stmt
|
||||
selectRoomNIDStmt *sql.Stmt
|
||||
insertRoomNIDStmt *sql.Stmt
|
||||
selectRoomNIDStmt *sql.Stmt
|
||||
selectLatestEventNIDsStmt *sql.Stmt
|
||||
updateLatestEventNIDsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *roomStatements) prepare(db *sql.DB) (err error) {
|
||||
|
@ -41,6 +54,12 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectRoomNIDStmt, err = db.Prepare(selectRoomNIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectLatestEventNIDsStmt, err = db.Prepare(selectLatestEventNIDsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.updateLatestEventNIDsStmt, err = db.Prepare(updateLatestEventNIDsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -55,3 +74,25 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) {
|
|||
err := s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID)
|
||||
return types.RoomNID(roomNID), err
|
||||
}
|
||||
|
||||
func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, error) {
|
||||
var nids pq.Int64Array
|
||||
err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventNIDs := make([]types.EventNID, len(nids))
|
||||
for i := range nids {
|
||||
eventNIDs[i] = types.EventNID(nids[i])
|
||||
}
|
||||
return eventNIDs, nil
|
||||
}
|
||||
|
||||
func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID) error {
|
||||
nids := make([]int64, len(eventNIDs))
|
||||
for i := range eventNIDs {
|
||||
nids[i] = int64(eventNIDs[i])
|
||||
}
|
||||
_, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids))
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ type statements struct {
|
|||
eventJSONStatements
|
||||
stateSnapshotStatements
|
||||
stateBlockStatements
|
||||
previousEventStatements
|
||||
}
|
||||
|
||||
func (s *statements) prepare(db *sql.DB) error {
|
||||
|
@ -50,5 +51,9 @@ func (s *statements) prepare(db *sql.DB) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err = s.previousEventStatements.prepare(db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -195,3 +195,63 @@ func (d *Database) StateBlockNIDs(stateNIDs []types.StateSnapshotNID) ([]types.S
|
|||
func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) {
|
||||
return d.statements.bulkSelectStateDataEntries(stateBlockNIDs)
|
||||
}
|
||||
|
||||
// GetLatestEventsForUpdate implements input.EventDatabase
|
||||
func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) {
|
||||
txn, err := d.db.Begin()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eventNIDs, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID)
|
||||
if err != nil {
|
||||
txn.Rollback()
|
||||
return nil, nil, err
|
||||
}
|
||||
stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(txn, eventNIDs)
|
||||
if err != nil {
|
||||
txn.Rollback()
|
||||
return nil, nil, err
|
||||
}
|
||||
return stateAndRefs, &roomRecentEventsUpdater{txn, d}, nil
|
||||
}
|
||||
|
||||
type roomRecentEventsUpdater struct {
|
||||
txn *sql.Tx
|
||||
d *Database
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
|
||||
for _, ref := range previousEventReferences {
|
||||
if err := u.d.statements.insertPreviousEvent(u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) {
|
||||
err := u.d.statements.selectPreviousEventExists(u.txn, eventReference.EventID, eventReference.EventSHA256)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference) error {
|
||||
eventNIDs := make([]types.EventNID, len(latest))
|
||||
for i := range latest {
|
||||
eventNIDs[i] = latest[i].EventNID
|
||||
}
|
||||
return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs)
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) Commit() error {
|
||||
return u.txn.Commit()
|
||||
}
|
||||
|
||||
func (u *roomRecentEventsUpdater) Rollback() error {
|
||||
return u.txn.Rollback()
|
||||
}
|
||||
|
|
|
@ -79,6 +79,15 @@ func (s StateAtEvent) IsStateEvent() bool {
|
|||
return s.EventStateKeyNID != 0
|
||||
}
|
||||
|
||||
// StateAtEventAndReference is StateAtEvent and gomatrixserverlib.EventReference glued together.
|
||||
// It is used when looking up the latest events in a room in the database.
|
||||
// The gomatrixserverlib.EventReference is used to check whether a new event references the event.
|
||||
// The StateAtEvent is used to construct the current state of the room from the latest events.
|
||||
type StateAtEventAndReference struct {
|
||||
StateAtEvent
|
||||
gomatrixserverlib.EventReference
|
||||
}
|
||||
|
||||
// An Event is a gomatrixserverlib.Event with the numeric event ID attached.
|
||||
// It is when performing bulk event lookup in the database.
|
||||
type Event struct {
|
||||
|
@ -119,3 +128,24 @@ type StateEntryList struct {
|
|||
StateBlockNID StateBlockNID
|
||||
StateEntries []StateEntry
|
||||
}
|
||||
|
||||
// A RoomRecentEventsUpdater is used to update the recent events in a room.
|
||||
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
|
||||
// lock on the row holding the latest events for the room.)
|
||||
type RoomRecentEventsUpdater interface {
|
||||
// Store the previous events referenced by an event.
|
||||
// This adds the event NID to an entry in the database for each of the previous events.
|
||||
// If there isn't an entry for one of previous events then an entry is created.
|
||||
// If the entry already lists the event NID as a referrer then the entry unmodified.
|
||||
// (i.e. the operation is idempotent)
|
||||
StorePreviousEvents(eventNID EventNID, previousEventReferences []gomatrixserverlib.EventReference) error
|
||||
// Check whether the eventReference is already referenced by another matrix event.
|
||||
IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error)
|
||||
// Set the list of latest events for the room.
|
||||
// This replaces the current list stored in the database with the given list
|
||||
SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference) error
|
||||
// Commit the transaction
|
||||
Commit() error
|
||||
// Rollback the transaction.
|
||||
Rollback() error
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue