Get the SQL working, and make it more idempotent (#17)
* Get the SQL working, and make it more idempotentmain
parent
c96d3eec0c
commit
a9b296c522
|
@ -58,15 +58,23 @@ func doUpdateLatestEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if this event references any of the latest events in the room.
|
// Check if this event references any of the latest events in the room.
|
||||||
|
var alreadyInLatest bool
|
||||||
var newLatest []types.StateAtEventAndReference
|
var newLatest []types.StateAtEventAndReference
|
||||||
for _, l := range oldLatest {
|
for _, l := range oldLatest {
|
||||||
|
keep := true
|
||||||
for _, prevEvent := range prevEvents {
|
for _, prevEvent := range prevEvents {
|
||||||
if l.EventID == prevEvent.EventID && bytes.Compare(l.EventSHA256, prevEvent.EventSHA256) == 0 {
|
if l.EventID == prevEvent.EventID && bytes.Compare(l.EventSHA256, prevEvent.EventSHA256) == 0 {
|
||||||
// This event can be removed from the latest events cause we've found an event that references it.
|
// This event can be removed from the latest events cause we've found an event that references it.
|
||||||
// (If an event is referenced by another event then it can't be one of the latest events in the room
|
// (If an event is referenced by another event then it can't be one of the latest events in the room
|
||||||
// because we have an event that comes after it)
|
// because we have an event that comes after it)
|
||||||
continue
|
keep = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if l.EventNID == stateAtEvent.EventNID {
|
||||||
|
alreadyInLatest = true
|
||||||
|
}
|
||||||
|
if keep {
|
||||||
// Keep the event in the latest events.
|
// Keep the event in the latest events.
|
||||||
newLatest = append(newLatest, l)
|
newLatest = append(newLatest, l)
|
||||||
}
|
}
|
||||||
|
@ -79,8 +87,9 @@ func doUpdateLatestEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !alreadyReferenced {
|
if !alreadyReferenced && !alreadyInLatest {
|
||||||
// This event is not referenced by any of the events in the room.
|
// This event is not referenced by any of the events in the room
|
||||||
|
// and the event is not already in the latest events.
|
||||||
// Add it to the latest events
|
// Add it to the latest events
|
||||||
newLatest = append(newLatest, types.StateAtEventAndReference{
|
newLatest = append(newLatest, types.StateAtEventAndReference{
|
||||||
StateAtEvent: stateAtEvent,
|
StateAtEvent: stateAtEvent,
|
||||||
|
|
|
@ -31,8 +31,7 @@ INSERT INTO event_state_keys (event_state_key_nid, event_state_key) VALUES
|
||||||
const insertEventStateKeyNIDSQL = "" +
|
const insertEventStateKeyNIDSQL = "" +
|
||||||
"INSERT INTO event_state_keys (event_state_key) VALUES ($1)" +
|
"INSERT INTO event_state_keys (event_state_key) VALUES ($1)" +
|
||||||
" ON CONFLICT ON CONSTRAINT event_state_key_unique" +
|
" ON CONFLICT ON CONSTRAINT event_state_key_unique" +
|
||||||
" DO UPDATE SET event_state_key = $1" +
|
" DO NOTHING RETURNING (event_state_key_nid)"
|
||||||
" RETURNING (event_state_key_nid)"
|
|
||||||
|
|
||||||
const selectEventStateKeyNIDSQL = "" +
|
const selectEventStateKeyNIDSQL = "" +
|
||||||
"SELECT event_state_key_nid FROM event_state_keys WHERE event_state_key = $1"
|
"SELECT event_state_key_nid FROM event_state_keys WHERE event_state_key = $1"
|
||||||
|
|
|
@ -50,14 +50,18 @@ INSERT INTO event_types (event_type_nid, event_type) VALUES
|
||||||
// In that case the ID will be assigned using the next value from the sequence.
|
// In that case the ID will be assigned using the next value from the sequence.
|
||||||
// We use `RETURNING` to tell postgres to return the assigned ID.
|
// We use `RETURNING` to tell postgres to return the assigned ID.
|
||||||
// But it's possible that the type was added in a query that raced with us.
|
// But it's possible that the type was added in a query that raced with us.
|
||||||
// This will result in a conflict on the event_type_unique constraint.
|
// This will result in a conflict on the event_type_unique constraint, in this
|
||||||
// We peform a update that does nothing rather that doing nothing at all because
|
// case we do nothing. Postgresql won't return a row in that case so we rely on
|
||||||
// postgres won't return anything unless we touch a row in the table.
|
// the caller catching the sql.ErrNoRows error and running a select to get the row.
|
||||||
|
// We could get postgresql to return the row on a conflict by updating the row
|
||||||
|
// but it doesn't seem like a good idea to modify the rows just to make postgresql
|
||||||
|
// return it. Modifying the rows will cause postgres to assign a new tuple for the
|
||||||
|
// row even though the data doesn't change resulting in unncesssary modifications
|
||||||
|
// to the indexes.
|
||||||
const insertEventTypeNIDSQL = "" +
|
const insertEventTypeNIDSQL = "" +
|
||||||
"INSERT INTO event_types (event_type) VALUES ($1)" +
|
"INSERT INTO event_types (event_type) VALUES ($1)" +
|
||||||
" ON CONFLICT ON CONSTRAINT event_type_unique" +
|
" ON CONFLICT ON CONSTRAINT event_type_unique" +
|
||||||
" DO UPDATE SET event_type = $1" +
|
" DO NOTHING RETURNING (event_type_nid)"
|
||||||
" RETURNING (event_type_nid)"
|
|
||||||
|
|
||||||
const selectEventTypeNIDSQL = "" +
|
const selectEventTypeNIDSQL = "" +
|
||||||
"SELECT event_type_nid FROM event_types WHERE event_type = $1"
|
"SELECT event_type_nid FROM event_types WHERE event_type = $1"
|
||||||
|
|
|
@ -47,9 +47,12 @@ const insertEventSQL = "" +
|
||||||
"INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids)" +
|
"INSERT INTO events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids)" +
|
||||||
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||||
" ON CONFLICT ON CONSTRAINT event_id_unique" +
|
" ON CONFLICT ON CONSTRAINT event_id_unique" +
|
||||||
" DO UPDATE SET event_id = $1" +
|
" DO NOTHING" +
|
||||||
" RETURNING event_nid, state_snapshot_nid"
|
" RETURNING event_nid, state_snapshot_nid"
|
||||||
|
|
||||||
|
const selectEventSQL = "" +
|
||||||
|
"SELECT event_nid, state_snapshot_nid FROM events WHERE event_id = $1"
|
||||||
|
|
||||||
// Bulk lookup of events by string ID.
|
// Bulk lookup of events by string ID.
|
||||||
// Sort by the numeric IDs for event type and state key.
|
// Sort by the numeric IDs for event type and state key.
|
||||||
// This means we can use binary search to lookup entries by type and state key.
|
// This means we can use binary search to lookup entries by type and state key.
|
||||||
|
@ -71,6 +74,7 @@ const bulkSelectStateAtEventAndReferenceSQL = "" +
|
||||||
|
|
||||||
type eventStatements struct {
|
type eventStatements struct {
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
|
selectEventStmt *sql.Stmt
|
||||||
bulkSelectStateEventByIDStmt *sql.Stmt
|
bulkSelectStateEventByIDStmt *sql.Stmt
|
||||||
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
||||||
updateEventStateStmt *sql.Stmt
|
updateEventStateStmt *sql.Stmt
|
||||||
|
@ -85,6 +89,9 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectEventStmt, err = db.Prepare(selectEventSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
if s.bulkSelectStateEventByIDStmt, err = db.Prepare(bulkSelectStateEventByIDSQL); err != nil {
|
if s.bulkSelectStateEventByIDStmt, err = db.Prepare(bulkSelectStateEventByIDSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -119,6 +126,13 @@ func (s *eventStatements) insertEvent(
|
||||||
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
|
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *eventStatements) selectEvent(eventID string) (types.EventNID, types.StateSnapshotNID, error) {
|
||||||
|
var eventNID int64
|
||||||
|
var stateNID int64
|
||||||
|
err := s.selectEventStmt.QueryRow(eventID).Scan(&eventNID, &stateNID)
|
||||||
|
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.StateEntry, error) {
|
func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.StateEntry, error) {
|
||||||
rows, err := s.bulkSelectStateEventByIDStmt.Query(pq.StringArray(eventIDs))
|
rows, err := s.bulkSelectStateEventByIDStmt.Query(pq.StringArray(eventIDs))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -134,9 +148,9 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S
|
||||||
for ; rows.Next(); i++ {
|
for ; rows.Next(); i++ {
|
||||||
result := &results[i]
|
result := &results[i]
|
||||||
if err = rows.Scan(
|
if err = rows.Scan(
|
||||||
&result.EventNID,
|
|
||||||
&result.EventTypeNID,
|
&result.EventTypeNID,
|
||||||
&result.EventStateKeyNID,
|
&result.EventStateKeyNID,
|
||||||
|
&result.EventNID,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -163,9 +177,9 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types
|
||||||
for ; rows.Next(); i++ {
|
for ; rows.Next(); i++ {
|
||||||
result := &results[i]
|
result := &results[i]
|
||||||
if err = rows.Scan(
|
if err = rows.Scan(
|
||||||
&result.EventNID,
|
|
||||||
&result.EventTypeNID,
|
&result.EventTypeNID,
|
||||||
&result.EventStateKeyNID,
|
&result.EventStateKeyNID,
|
||||||
|
&result.EventNID,
|
||||||
&result.BeforeStateSnapshotNID,
|
&result.BeforeStateSnapshotNID,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS rooms (
|
||||||
-- The most recent events in the room that aren't referenced by another event.
|
-- The most recent events in the room that aren't referenced by another event.
|
||||||
-- This list may empty if the server hasn't joined the room yet.
|
-- This list may empty if the server hasn't joined the room yet.
|
||||||
-- (The server will be in that state while it stores the events for the initial state of the room)
|
-- (The server will be in that state while it stores the events for the initial state of the room)
|
||||||
latest_event_nids BIGINT[] NOT NULL
|
latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[]
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
|
@ -24,8 +24,7 @@ CREATE TABLE IF NOT EXISTS rooms (
|
||||||
const insertRoomNIDSQL = "" +
|
const insertRoomNIDSQL = "" +
|
||||||
"INSERT INTO rooms (room_id) VALUES ($1)" +
|
"INSERT INTO rooms (room_id) VALUES ($1)" +
|
||||||
" ON CONFLICT ON CONSTRAINT room_id_unique" +
|
" ON CONFLICT ON CONSTRAINT room_id_unique" +
|
||||||
" DO UPDATE SET room_id = $1" +
|
" DO NOTHING RETURNING (room_nid)"
|
||||||
" RETURNING (room_nid)"
|
|
||||||
|
|
||||||
const selectRoomNIDSQL = "" +
|
const selectRoomNIDSQL = "" +
|
||||||
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
||||||
|
|
|
@ -73,7 +73,13 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ
|
||||||
event.EventReference().EventSHA256,
|
event.EventReference().EventSHA256,
|
||||||
authEventNIDs,
|
authEventNIDs,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return 0, types.StateAtEvent{}, err
|
if err == sql.ErrNoRows {
|
||||||
|
// We've already inserted the event so select the numeric event ID
|
||||||
|
eventNID, stateNID, err = d.statements.selectEvent(event.EventID())
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return 0, types.StateAtEvent{}, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = d.statements.insertEventJSON(eventNID, event.JSON()); err != nil {
|
if err = d.statements.insertEventJSON(eventNID, event.JSON()); err != nil {
|
||||||
|
@ -97,12 +103,13 @@ func (d *Database) assignRoomNID(roomID string) (types.RoomNID, error) {
|
||||||
roomNID, err := d.statements.selectRoomNID(roomID)
|
roomNID, err := d.statements.selectRoomNID(roomID)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// We don't have a numeric ID so insert one into the database.
|
// We don't have a numeric ID so insert one into the database.
|
||||||
return d.statements.insertRoomNID(roomID)
|
roomNID, err = d.statements.insertRoomNID(roomID)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// We raced with another insert so run the select again.
|
||||||
|
roomNID, err = d.statements.selectRoomNID(roomID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
return roomNID, err
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return roomNID, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, error) {
|
func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, error) {
|
||||||
|
@ -110,12 +117,13 @@ func (d *Database) assignEventTypeNID(eventType string) (types.EventTypeNID, err
|
||||||
eventTypeNID, err := d.statements.selectEventTypeNID(eventType)
|
eventTypeNID, err := d.statements.selectEventTypeNID(eventType)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// We don't have a numeric ID so insert one into the database.
|
// We don't have a numeric ID so insert one into the database.
|
||||||
return d.statements.insertEventTypeNID(eventType)
|
eventTypeNID, err = d.statements.insertEventTypeNID(eventType)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// We raced with another insert so run the select again.
|
||||||
|
eventTypeNID, err = d.statements.selectEventTypeNID(eventType)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
return eventTypeNID, err
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return eventTypeNID, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
|
func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyNID, error) {
|
||||||
|
@ -123,12 +131,13 @@ func (d *Database) assignStateKeyNID(eventStateKey string) (types.EventStateKeyN
|
||||||
eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey)
|
eventStateKeyNID, err := d.statements.selectEventStateKeyNID(eventStateKey)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// We don't have a numeric ID so insert one into the database.
|
// We don't have a numeric ID so insert one into the database.
|
||||||
return d.statements.insertEventStateKeyNID(eventStateKey)
|
eventStateKeyNID, err = d.statements.insertEventStateKeyNID(eventStateKey)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// We raced with another insert so run the select again.
|
||||||
|
eventStateKeyNID, err = d.statements.selectEventStateKeyNID(eventStateKey)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
return eventStateKeyNID, err
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return eventStateKeyNID, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateEntriesForEventIDs implements input.EventDatabase
|
// StateEntriesForEventIDs implements input.EventDatabase
|
||||||
|
|
Loading…
Reference in New Issue