Store current room state (#55)
parent
a3c66f7fa0
commit
c1c837516a
|
@ -0,0 +1,91 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/events"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const currentRoomStateSchema = `
|
||||
-- Stores the current room state for every room.
|
||||
CREATE TABLE IF NOT EXISTS current_room_state (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
event_id TEXT NOT NULL,
|
||||
-- The state event type e.g 'm.room.member'
|
||||
type TEXT NOT NULL,
|
||||
-- The state_key value for this state event e.g ''
|
||||
state_key TEXT NOT NULL,
|
||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||
event_json TEXT NOT NULL,
|
||||
-- The 'content.membership' value if this event is an m.room.member event. For other
|
||||
-- events, this will be NULL.
|
||||
membership TEXT,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
-- for event deletion
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON current_room_state(event_id);
|
||||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||
" ON CONFLICT ON CONSTRAINT room_state_unique" +
|
||||
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6"
|
||||
|
||||
const deleteRoomStateByEventIDSQL = "" +
|
||||
"DELETE FROM current_room_state WHERE event_id = $1"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(currentRoomStateSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) UpdateRoomState(txn *sql.Tx, added []gomatrixserverlib.Event, removedEventIDs []string) error {
|
||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||
for _, eventID := range removedEventIDs {
|
||||
_, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, event := range added {
|
||||
if event.StateKey() == nil {
|
||||
// ignore non state events
|
||||
continue
|
||||
}
|
||||
var membership *string
|
||||
if event.Type() == "m.room.member" {
|
||||
var memberContent events.MemberContent
|
||||
if err := json.Unmarshal(event.Content(), &memberContent); err != nil {
|
||||
return err
|
||||
}
|
||||
membership = &memberContent.Membership
|
||||
}
|
||||
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
|
||||
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -2,8 +2,10 @@ package storage
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const outputRoomEventsSchema = `
|
||||
|
@ -13,21 +15,30 @@ CREATE TABLE IF NOT EXISTS output_room_events (
|
|||
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
|
||||
-- This isn't a problem for us since we just want to order by this field.
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
-- The event ID for the event
|
||||
event_id TEXT NOT NULL,
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||
event_json TEXT NOT NULL,
|
||||
-- A list of event IDs which represent a delta of added/removed room state.
|
||||
add_state_ids TEXT[] NOT NULL,
|
||||
remove_state_ids TEXT[] NOT NULL
|
||||
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL
|
||||
-- if there is no delta.
|
||||
add_state_ids TEXT[],
|
||||
remove_state_ids TEXT[]
|
||||
);
|
||||
-- for event selection
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON output_room_events(event_id);
|
||||
`
|
||||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO output_room_events (room_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4)"
|
||||
"INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5)"
|
||||
|
||||
const selectEventsSQL = "" +
|
||||
"SELECT event_json FROM output_room_events WHERE event_id = ANY($1)"
|
||||
|
||||
type outputRoomEventsStatements struct {
|
||||
insertEventStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||
|
@ -38,11 +49,44 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs.
|
||||
func (s *outputRoomEventsStatements) InsertEvent(roomID string, eventJSON []byte, addState, removeState []string) error {
|
||||
_, err := s.insertEventStmt.Exec(roomID, eventJSON, pq.StringArray(addState), pq.StringArray(removeState))
|
||||
func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) error {
|
||||
_, err := txn.Stmt(s.insertEventStmt).Exec(
|
||||
event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
||||
// from the database.
|
||||
func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) {
|
||||
rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make([]gomatrixserverlib.Event, len(eventIDs))
|
||||
i := 0
|
||||
for ; rows.Next(); i++ {
|
||||
var eventBytes []byte
|
||||
if err := rows.Scan(&eventBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, ev)
|
||||
}
|
||||
if i != len(eventIDs) {
|
||||
return nil, fmt.Errorf("failed to map all event IDs to events: (%d != %d)", i, len(eventIDs))
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ type SyncServerDatabase struct {
|
|||
db *sql.DB
|
||||
partitions common.PartitionOffsetStatements
|
||||
events outputRoomEventsStatements
|
||||
roomstate currentRoomStateStatements
|
||||
}
|
||||
|
||||
// NewSyncServerDatabase creates a new sync server database
|
||||
|
@ -30,13 +31,44 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
|||
if err = events.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SyncServerDatabase{db, partitions, events}, nil
|
||||
state := currentRoomStateStatements{}
|
||||
if err := state.prepare(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SyncServerDatabase{db, partitions, events, state}, nil
|
||||
}
|
||||
|
||||
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
||||
// when generating the stream position for this event. Returns an error if there was a problem inserting this event.
|
||||
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) error {
|
||||
return d.events.InsertEvent(ev.RoomID(), ev.JSON(), addStateEventIDs, removeStateEventIDs)
|
||||
return runTransaction(d.db, func(txn *sql.Tx) error {
|
||||
if err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 {
|
||||
// Nothing to do, the event may have just been a message event.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the current room state based on the added/removed state event IDs.
|
||||
// In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event.
|
||||
// However, conflict resolution may result in there being different events being added, or even some removed.
|
||||
if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() {
|
||||
// common case
|
||||
if err := d.roomstate.UpdateRoomState(txn, []gomatrixserverlib.Event{*ev}, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// uncommon case: we need to fetch the full event for each event ID mentioned, then update room state
|
||||
added, err := d.events.Events(txn, addStateEventIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs)
|
||||
})
|
||||
}
|
||||
|
||||
// PartitionOffsets implements common.PartitionStorer
|
||||
|
@ -48,3 +80,22 @@ func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionO
|
|||
func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error {
|
||||
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
|
||||
}
|
||||
|
||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||
txn, err := db.Begin()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
txn.Rollback()
|
||||
panic(r)
|
||||
} else if err != nil {
|
||||
txn.Rollback()
|
||||
} else {
|
||||
err = txn.Commit()
|
||||
}
|
||||
}()
|
||||
err = fn(txn)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -65,7 +65,12 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
|
||||
if err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs); err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithError(err).WithField("OutputRoomEvent", output).Panicf("roomserver output log: write event failure")
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
"add": output.AddsStateEventIDs,
|
||||
"del": output.RemovesStateEventIDs,
|
||||
}).Panicf("roomserver output log: write event failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue