Write to an output kafka log for new room events in the roomserver. (#20)
* Track which events have been sent and the last event sent for each room * Write output events * comment that an event could be sent twicemain
parent
45d1e61a9d
commit
237b912cf5
|
@ -0,0 +1,85 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
// An OutputRoomEvent is written when the roomserver receives a new event.
|
||||||
|
type OutputRoomEvent struct {
|
||||||
|
// The JSON bytes of the event.
|
||||||
|
Event []byte
|
||||||
|
// The state event IDs needed to determine who can see this event.
|
||||||
|
// This can be used to tell which users to send the event to.
|
||||||
|
VisibilityEventIDs []string
|
||||||
|
// The latest events in the room after this event.
|
||||||
|
// This can be used to set the prev events for new events in the room.
|
||||||
|
// This also can be used to get the full current state after this event.
|
||||||
|
LatestEventIDs []string
|
||||||
|
// The state event IDs that were added to the state of the room by this event.
|
||||||
|
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
||||||
|
// view of the current state of the room.
|
||||||
|
AddsStateEventIDs []string
|
||||||
|
// The state event IDs that were removed from the state of the room by this event.
|
||||||
|
RemovesStateEventIDs []string
|
||||||
|
// The ID of the event that was output before this event.
|
||||||
|
// Or the empty string if this is the first event output for this room.
|
||||||
|
// This is used by consumers to check if they can safely update their
|
||||||
|
// current state using the delta supplied in AddsStateEventIDs and
|
||||||
|
// RemovesStateEventIDs.
|
||||||
|
// If the LastSentEventID doesn't match what they were expecting it to be
|
||||||
|
// they can use the LatestEventIDs to request the full current state.
|
||||||
|
LastSentEventID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON implements json.Unmarshaller
|
||||||
|
func (ore *OutputRoomEvent) UnmarshalJSON(data []byte) error {
|
||||||
|
// Create a struct rather than unmarshalling directly into the OutputRoomEvent
|
||||||
|
// so that we can use json.RawMessage.
|
||||||
|
// We use json.RawMessage so that the event JSON is sent as JSON rather than
|
||||||
|
// being base64 encoded which is the default for []byte.
|
||||||
|
var content struct {
|
||||||
|
Event *json.RawMessage
|
||||||
|
VisibilityEventIDs []string
|
||||||
|
LatestEventIDs []string
|
||||||
|
AddsStateEventIDs []string
|
||||||
|
RemovesStateEventIDs []string
|
||||||
|
LastSentEventID string
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(data, &content); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if content.Event != nil {
|
||||||
|
ore.Event = []byte(*content.Event)
|
||||||
|
}
|
||||||
|
ore.VisibilityEventIDs = content.VisibilityEventIDs
|
||||||
|
ore.LatestEventIDs = content.LatestEventIDs
|
||||||
|
ore.AddsStateEventIDs = content.AddsStateEventIDs
|
||||||
|
ore.RemovesStateEventIDs = content.RemovesStateEventIDs
|
||||||
|
ore.LastSentEventID = content.LastSentEventID
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON implements json.Marshaller
|
||||||
|
func (ore OutputRoomEvent) MarshalJSON() ([]byte, error) {
|
||||||
|
// Create a struct rather than marshalling directly from the OutputRoomEvent
|
||||||
|
// so that we can use json.RawMessage.
|
||||||
|
// We use json.RawMessage so that the event JSON is sent as JSON rather than
|
||||||
|
// being base64 encoded which is the default for []byte.
|
||||||
|
event := json.RawMessage(ore.Event)
|
||||||
|
content := struct {
|
||||||
|
Event *json.RawMessage
|
||||||
|
VisibilityEventIDs []string
|
||||||
|
LatestEventIDs []string
|
||||||
|
AddsStateEventIDs []string
|
||||||
|
RemovesStateEventIDs []string
|
||||||
|
LastSentEventID string
|
||||||
|
}{
|
||||||
|
Event: &event,
|
||||||
|
VisibilityEventIDs: ore.VisibilityEventIDs,
|
||||||
|
LatestEventIDs: ore.LatestEventIDs,
|
||||||
|
AddsStateEventIDs: ore.AddsStateEventIDs,
|
||||||
|
RemovesStateEventIDs: ore.RemovesStateEventIDs,
|
||||||
|
LastSentEventID: ore.LastSentEventID,
|
||||||
|
}
|
||||||
|
return json.Marshal(&content)
|
||||||
|
}
|
|
@ -35,14 +35,32 @@ type Consumer struct {
|
||||||
Consumer sarama.Consumer
|
Consumer sarama.Consumer
|
||||||
// The database used to store the room events.
|
// The database used to store the room events.
|
||||||
DB ConsumerDatabase
|
DB ConsumerDatabase
|
||||||
|
Producer sarama.SyncProducer
|
||||||
// The kafkaesque topic to consume room events from.
|
// The kafkaesque topic to consume room events from.
|
||||||
// This is the name used in kafka to identify the stream to consume events from.
|
// This is the name used in kafka to identify the stream to consume events from.
|
||||||
RoomEventTopic string
|
InputRoomEventTopic string
|
||||||
|
// The kafkaesque topic to output new room events to.
|
||||||
|
// This is the name used in kafka to identify the stream to write events to.
|
||||||
|
OutputRoomEventTopic string
|
||||||
// The ErrorLogger for this consumer.
|
// The ErrorLogger for this consumer.
|
||||||
// If left as nil then the consumer will panic when it encounters an error
|
// If left as nil then the consumer will panic when it encounters an error
|
||||||
ErrorLogger ErrorLogger
|
ErrorLogger ErrorLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteOutputRoomEvent implements OutputRoomEventWriter
|
||||||
|
func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error {
|
||||||
|
var m sarama.ProducerMessage
|
||||||
|
value, err := json.Marshal(output)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.Topic = c.OutputRoomEventTopic
|
||||||
|
m.Key = sarama.StringEncoder("")
|
||||||
|
m.Value = sarama.ByteEncoder(value)
|
||||||
|
_, _, err = c.Producer.SendMessage(&m)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Start starts the consumer consuming.
|
// Start starts the consumer consuming.
|
||||||
// Starts up a goroutine for each partition in the kafka stream.
|
// Starts up a goroutine for each partition in the kafka stream.
|
||||||
// Returns nil once all the goroutines are started.
|
// Returns nil once all the goroutines are started.
|
||||||
|
@ -50,7 +68,7 @@ type Consumer struct {
|
||||||
func (c *Consumer) Start() error {
|
func (c *Consumer) Start() error {
|
||||||
offsets := map[int32]int64{}
|
offsets := map[int32]int64{}
|
||||||
|
|
||||||
partitions, err := c.Consumer.Partitions(c.RoomEventTopic)
|
partitions, err := c.Consumer.Partitions(c.InputRoomEventTopic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -59,7 +77,7 @@ func (c *Consumer) Start() error {
|
||||||
offsets[partition] = sarama.OffsetOldest
|
offsets[partition] = sarama.OffsetOldest
|
||||||
}
|
}
|
||||||
|
|
||||||
storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic)
|
storedOffsets, err := c.DB.PartitionOffsets(c.InputRoomEventTopic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -70,7 +88,7 @@ func (c *Consumer) Start() error {
|
||||||
|
|
||||||
var partitionConsumers []sarama.PartitionConsumer
|
var partitionConsumers []sarama.PartitionConsumer
|
||||||
for partition, offset := range offsets {
|
for partition, offset := range offsets {
|
||||||
pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset)
|
pc, err := c.Consumer.ConsumePartition(c.InputRoomEventTopic, partition, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for _, p := range partitionConsumers {
|
for _, p := range partitionConsumers {
|
||||||
p.Close()
|
p.Close()
|
||||||
|
@ -95,7 +113,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
// If the message is invalid then log it and move onto the next message in the stream.
|
// If the message is invalid then log it and move onto the next message in the stream.
|
||||||
c.logError(message, err)
|
c.logError(message, err)
|
||||||
} else {
|
} else {
|
||||||
if err := processRoomEvent(c.DB, input); err != nil {
|
if err := processRoomEvent(c.DB, c, input); err != nil {
|
||||||
// If there was an error processing the message then log it and
|
// If there was an error processing the message then log it and
|
||||||
// move onto the next message in the stream.
|
// move onto the next message in the stream.
|
||||||
// TODO: If the error was due to a problem talking to the database
|
// TODO: If the error was due to a problem talking to the database
|
||||||
|
@ -105,7 +123,7 @@ func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Advance our position in the stream so that we will start at the right position after a restart.
|
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||||
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
|
if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil {
|
||||||
c.logError(message, err)
|
c.logError(message, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,11 +36,20 @@ type RoomEventDatabase interface {
|
||||||
SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error
|
SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error
|
||||||
// Lookup the latest events in a room in preparation for an update.
|
// Lookup the latest events in a room in preparation for an update.
|
||||||
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
|
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
|
||||||
|
// Returns the latest events in the room and the last eventID sent to the log along with an updater.
|
||||||
// If this returns an error then no further action is required.
|
// If this returns an error then no further action is required.
|
||||||
GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error)
|
GetLatestEventsForUpdate(roomNID types.RoomNID) (
|
||||||
|
latestEvents []types.StateAtEventAndReference, lastEventIDSent string, updater types.RoomRecentEventsUpdater, err error,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
|
||||||
|
type OutputRoomEventWriter interface {
|
||||||
|
// Write an event.
|
||||||
|
WriteOutputRoomEvent(output api.OutputRoomEvent) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error {
|
||||||
// Parse and validate the event JSON
|
// Parse and validate the event JSON
|
||||||
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event)
|
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -95,7 +104,7 @@ func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the extremities of the event graph for the room
|
// Update the extremities of the event graph for the room
|
||||||
if err := updateLatestEvents(db, roomNID, stateAtEvent, event); err != nil {
|
if err := updateLatestEvents(db, ow, roomNID, stateAtEvent, event); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,13 @@ package input
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// updateLatestEvents updates the list of latest events for this room.
|
// updateLatestEvents updates the list of latest events for this room in the database and writes the
|
||||||
|
// event to the output log.
|
||||||
// The latest events are the events that aren't referenced by another event in the database:
|
// The latest events are the events that aren't referenced by another event in the database:
|
||||||
//
|
//
|
||||||
// Time goes down the page. 1 is the m.room.create event (root).
|
// Time goes down the page. 1 is the m.room.create event (root).
|
||||||
|
@ -22,9 +24,9 @@ import (
|
||||||
// 7 <----- latest
|
// 7 <----- latest
|
||||||
//
|
//
|
||||||
func updateLatestEvents(
|
func updateLatestEvents(
|
||||||
db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
db RoomEventDatabase, ow OutputRoomEventWriter, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
oldLatest, updater, err := db.GetLatestEventsForUpdate(roomNID)
|
oldLatest, lastEventIDSent, updater, err := db.GetLatestEventsForUpdate(roomNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -42,22 +44,64 @@ func updateLatestEvents(
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = doUpdateLatestEvents(updater, oldLatest, roomNID, stateAtEvent, event)
|
err = doUpdateLatestEvents(updater, ow, oldLatest, lastEventIDSent, roomNID, stateAtEvent, event)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func doUpdateLatestEvents(
|
func doUpdateLatestEvents(
|
||||||
updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
updater types.RoomRecentEventsUpdater, ow OutputRoomEventWriter, oldLatest []types.StateAtEventAndReference, lastEventIDSent string, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event,
|
||||||
) error {
|
) error {
|
||||||
var err error
|
var err error
|
||||||
var prevEvents []gomatrixserverlib.EventReference
|
var prevEvents []gomatrixserverlib.EventReference
|
||||||
prevEvents = event.PrevEvents()
|
prevEvents = event.PrevEvents()
|
||||||
|
|
||||||
|
if hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID); err != nil {
|
||||||
|
return err
|
||||||
|
} else if hasBeenSent {
|
||||||
|
// Already sent this event so we can stop processing
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
|
if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if this event references any of the latest events in the room.
|
eventReference := event.EventReference()
|
||||||
|
// Check if this event is already referenced by another event in the room.
|
||||||
|
var alreadyReferenced bool
|
||||||
|
if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newLatest := calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
|
||||||
|
EventReference: eventReference,
|
||||||
|
StateAtEvent: stateAtEvent,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Send the event to the output logs.
|
||||||
|
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
|
||||||
|
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
|
||||||
|
// the write to the output log succeeds)
|
||||||
|
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
|
||||||
|
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||||
|
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||||
|
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||||
|
if err = writeEvent(ow, lastEventIDSent, event, newLatest); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = updater.SetLatestEvents(roomNID, newLatest, stateAtEvent.EventNID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = updater.MarkEventAsSent(stateAtEvent.EventNID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func calculateLatest(oldLatest []types.StateAtEventAndReference, alreadyReferenced bool, prevEvents []gomatrixserverlib.EventReference, newEvent types.StateAtEventAndReference) []types.StateAtEventAndReference {
|
||||||
var alreadyInLatest bool
|
var alreadyInLatest bool
|
||||||
var newLatest []types.StateAtEventAndReference
|
var newLatest []types.StateAtEventAndReference
|
||||||
for _, l := range oldLatest {
|
for _, l := range oldLatest {
|
||||||
|
@ -71,7 +115,7 @@ func doUpdateLatestEvents(
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if l.EventNID == stateAtEvent.EventNID {
|
if l.EventNID == newEvent.EventNID {
|
||||||
alreadyInLatest = true
|
alreadyInLatest = true
|
||||||
}
|
}
|
||||||
if keep {
|
if keep {
|
||||||
|
@ -80,26 +124,28 @@ func doUpdateLatestEvents(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
eventReference := event.EventReference()
|
|
||||||
// Check if this event is already referenced by another event in the room.
|
|
||||||
var alreadyReferenced bool
|
|
||||||
if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !alreadyReferenced && !alreadyInLatest {
|
if !alreadyReferenced && !alreadyInLatest {
|
||||||
// This event is not referenced by any of the events in the room
|
// This event is not referenced by any of the events in the room
|
||||||
// and the event is not already in the latest events.
|
// and the event is not already in the latest events.
|
||||||
// Add it to the latest events
|
// Add it to the latest events
|
||||||
newLatest = append(newLatest, types.StateAtEventAndReference{
|
newLatest = append(newLatest, newEvent)
|
||||||
StateAtEvent: stateAtEvent,
|
|
||||||
EventReference: eventReference,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = updater.SetLatestEvents(roomNID, newLatest); err != nil {
|
return newLatest
|
||||||
return err
|
}
|
||||||
}
|
|
||||||
|
func writeEvent(ow OutputRoomEventWriter, lastEventIDSent string, event gomatrixserverlib.Event, latest []types.StateAtEventAndReference) error {
|
||||||
return nil
|
|
||||||
|
latestEventIDs := make([]string, len(latest))
|
||||||
|
for i := range latest {
|
||||||
|
latestEventIDs[i] = latest[i].EventID
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Fill out AddsStateEventIDs and RemovesStateEventIDs
|
||||||
|
// TODO: Fill out VisibilityStateIDs
|
||||||
|
return ow.WriteOutputRoomEvent(api.OutputRoomEvent{
|
||||||
|
Event: event.JSON(),
|
||||||
|
LastSentEventID: lastEventIDSent,
|
||||||
|
LatestEventIDs: latestEventIDs,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,8 @@ import (
|
||||||
var (
|
var (
|
||||||
database = os.Getenv("DATABASE")
|
database = os.Getenv("DATABASE")
|
||||||
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
||||||
roomEventTopic = os.Getenv("TOPIC_ROOM_EVENT")
|
inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
|
||||||
|
outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT")
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -26,10 +27,17 @@ func main() {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
kafkaProducer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
consumer := input.Consumer{
|
consumer := input.Consumer{
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
DB: db,
|
DB: db,
|
||||||
RoomEventTopic: roomEventTopic,
|
Producer: kafkaProducer,
|
||||||
|
InputRoomEventTopic: inputRoomEventTopic,
|
||||||
|
OutputRoomEventTopic: outputRoomEventTopic,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = consumer.Start(); err != nil {
|
if err = consumer.Start(); err != nil {
|
||||||
|
|
|
@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS events (
|
||||||
-- Local numeric ID for the state_key of the event
|
-- Local numeric ID for the state_key of the event
|
||||||
-- This is 0 if the event is not a state event.
|
-- This is 0 if the event is not a state event.
|
||||||
event_state_key_nid BIGINT NOT NULL,
|
event_state_key_nid BIGINT NOT NULL,
|
||||||
|
-- Whether the event has been written to the output log.
|
||||||
|
sent_to_output BOOLEAN NOT NULL DEFAULT FALSE,
|
||||||
-- Local numeric ID for the state at the event.
|
-- Local numeric ID for the state at the event.
|
||||||
-- This is 0 if we don't know the state at the event.
|
-- This is 0 if we don't know the state at the event.
|
||||||
-- If the state is not 0 then this event is part of the contiguous
|
-- If the state is not 0 then this event is part of the contiguous
|
||||||
|
@ -68,6 +70,15 @@ const bulkSelectStateAtEventByIDSQL = "" +
|
||||||
const updateEventStateSQL = "" +
|
const updateEventStateSQL = "" +
|
||||||
"UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1"
|
"UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1"
|
||||||
|
|
||||||
|
const selectEventSentToOutputSQL = "" +
|
||||||
|
"SELECT sent_to_output FROM events WHERE event_nid = $1"
|
||||||
|
|
||||||
|
const updateEventSentToOutputSQL = "" +
|
||||||
|
"UPDATE events SET sent_to_output = TRUE WHERE event_nid = $1"
|
||||||
|
|
||||||
|
const selectEventIDSQL = "" +
|
||||||
|
"SELECT event_id FROM events WHERE event_nid = $1"
|
||||||
|
|
||||||
const bulkSelectStateAtEventAndReferenceSQL = "" +
|
const bulkSelectStateAtEventAndReferenceSQL = "" +
|
||||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
|
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
|
||||||
" FROM events WHERE event_nid = ANY($1)"
|
" FROM events WHERE event_nid = ANY($1)"
|
||||||
|
@ -78,6 +89,9 @@ type eventStatements struct {
|
||||||
bulkSelectStateEventByIDStmt *sql.Stmt
|
bulkSelectStateEventByIDStmt *sql.Stmt
|
||||||
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
bulkSelectStateAtEventByIDStmt *sql.Stmt
|
||||||
updateEventStateStmt *sql.Stmt
|
updateEventStateStmt *sql.Stmt
|
||||||
|
selectEventSentToOutputStmt *sql.Stmt
|
||||||
|
updateEventSentToOutputStmt *sql.Stmt
|
||||||
|
selectEventIDStmt *sql.Stmt
|
||||||
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
|
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,6 +115,15 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.updateEventStateStmt, err = db.Prepare(updateEventStateSQL); err != nil {
|
if s.updateEventStateStmt, err = db.Prepare(updateEventStateSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.updateEventSentToOutputStmt, err = db.Prepare(updateEventSentToOutputSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectEventSentToOutputStmt, err = db.Prepare(selectEventSentToOutputSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectEventIDStmt, err = db.Prepare(selectEventIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil {
|
if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -199,6 +222,21 @@ func (s *eventStatements) updateEventState(eventNID types.EventNID, stateNID typ
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *eventStatements) selectEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) (sentToOutput bool, err error) {
|
||||||
|
err = txn.Stmt(s.selectEventSentToOutputStmt).QueryRow(int64(eventNID)).Scan(&sentToOutput)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *eventStatements) updateEventSentToOutput(txn *sql.Tx, eventNID types.EventNID) error {
|
||||||
|
_, err := txn.Stmt(s.updateEventSentToOutputStmt).Exec(int64(eventNID))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *eventStatements) selectEventID(txn *sql.Tx, eventNID types.EventNID) (eventID string, err error) {
|
||||||
|
err = txn.Stmt(s.selectEventIDStmt).QueryRow(int64(eventNID)).Scan(&eventID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) {
|
func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) {
|
||||||
nids := make([]int64, len(eventNIDs))
|
nids := make([]int64, len(eventNIDs))
|
||||||
for i := range eventNIDs {
|
for i := range eventNIDs {
|
||||||
|
|
|
@ -16,7 +16,9 @@ CREATE TABLE IF NOT EXISTS rooms (
|
||||||
-- The most recent events in the room that aren't referenced by another event.
|
-- The most recent events in the room that aren't referenced by another event.
|
||||||
-- This list may empty if the server hasn't joined the room yet.
|
-- This list may empty if the server hasn't joined the room yet.
|
||||||
-- (The server will be in that state while it stores the events for the initial state of the room)
|
-- (The server will be in that state while it stores the events for the initial state of the room)
|
||||||
latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[]
|
latest_event_nids BIGINT[] NOT NULL DEFAULT '{}'::BIGINT[],
|
||||||
|
-- The last event written to the output log for this room.
|
||||||
|
last_event_sent_nid BIGINT NOT NULL DEFAULT 0
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
|
@ -30,10 +32,10 @@ const selectRoomNIDSQL = "" +
|
||||||
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
"SELECT room_nid FROM rooms WHERE room_id = $1"
|
||||||
|
|
||||||
const selectLatestEventNIDsSQL = "" +
|
const selectLatestEventNIDsSQL = "" +
|
||||||
"SELECT latest_event_nids FROM rooms WHERE room_nid = $1 FOR UPDATE"
|
"SELECT latest_event_nids, last_event_sent_nid FROM rooms WHERE room_nid = $1 FOR UPDATE"
|
||||||
|
|
||||||
const updateLatestEventNIDsSQL = "" +
|
const updateLatestEventNIDsSQL = "" +
|
||||||
"UPDATE rooms SET latest_event_nids = $2 WHERE room_nid = $1"
|
"UPDATE rooms SET latest_event_nids = $2, last_event_sent_nid = $3 WHERE room_nid = $1"
|
||||||
|
|
||||||
type roomStatements struct {
|
type roomStatements struct {
|
||||||
insertRoomNIDStmt *sql.Stmt
|
insertRoomNIDStmt *sql.Stmt
|
||||||
|
@ -74,24 +76,25 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) {
|
||||||
return types.RoomNID(roomNID), err
|
return types.RoomNID(roomNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, error) {
|
func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, error) {
|
||||||
var nids pq.Int64Array
|
var nids pq.Int64Array
|
||||||
err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids)
|
var lastEventSentNID int64
|
||||||
|
err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids, &lastEventSentNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
eventNIDs := make([]types.EventNID, len(nids))
|
eventNIDs := make([]types.EventNID, len(nids))
|
||||||
for i := range nids {
|
for i := range nids {
|
||||||
eventNIDs[i] = types.EventNID(nids[i])
|
eventNIDs[i] = types.EventNID(nids[i])
|
||||||
}
|
}
|
||||||
return eventNIDs, nil
|
return eventNIDs, types.EventNID(lastEventSentNID), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID) error {
|
func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID) error {
|
||||||
nids := make([]int64, len(eventNIDs))
|
nids := make([]int64, len(eventNIDs))
|
||||||
for i := range eventNIDs {
|
for i := range eventNIDs {
|
||||||
nids[i] = int64(eventNIDs[i])
|
nids[i] = int64(eventNIDs[i])
|
||||||
}
|
}
|
||||||
_, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids))
|
_, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids), int64(lastEventSentNID))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -206,22 +206,30 @@ func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.S
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestEventsForUpdate implements input.EventDatabase
|
// GetLatestEventsForUpdate implements input.EventDatabase
|
||||||
func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) {
|
func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, string, types.RoomRecentEventsUpdater, error) {
|
||||||
txn, err := d.db.Begin()
|
txn, err := d.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, "", nil, err
|
||||||
}
|
}
|
||||||
eventNIDs, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID)
|
eventNIDs, lastEventNIDSent, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
txn.Rollback()
|
txn.Rollback()
|
||||||
return nil, nil, err
|
return nil, "", nil, err
|
||||||
}
|
}
|
||||||
stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(txn, eventNIDs)
|
stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(txn, eventNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
txn.Rollback()
|
txn.Rollback()
|
||||||
return nil, nil, err
|
return nil, "", nil, err
|
||||||
}
|
}
|
||||||
return stateAndRefs, &roomRecentEventsUpdater{txn, d}, nil
|
var lastEventIDSent string
|
||||||
|
if lastEventNIDSent != 0 {
|
||||||
|
lastEventIDSent, err = d.statements.selectEventID(txn, lastEventNIDSent)
|
||||||
|
if err != nil {
|
||||||
|
txn.Rollback()
|
||||||
|
return nil, "", nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stateAndRefs, lastEventIDSent, &roomRecentEventsUpdater{txn, d}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type roomRecentEventsUpdater struct {
|
type roomRecentEventsUpdater struct {
|
||||||
|
@ -249,12 +257,20 @@ func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference) error {
|
func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference, lastEventNIDSent types.EventNID) error {
|
||||||
eventNIDs := make([]types.EventNID, len(latest))
|
eventNIDs := make([]types.EventNID, len(latest))
|
||||||
for i := range latest {
|
for i := range latest {
|
||||||
eventNIDs[i] = latest[i].EventNID
|
eventNIDs[i] = latest[i].EventNID
|
||||||
}
|
}
|
||||||
return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs)
|
return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs, lastEventNIDSent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *roomRecentEventsUpdater) HasEventBeenSent(eventNID types.EventNID) (bool, error) {
|
||||||
|
return u.d.statements.selectEventSentToOutput(u.txn, eventNID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *roomRecentEventsUpdater) MarkEventAsSent(eventNID types.EventNID) error {
|
||||||
|
return u.d.statements.updateEventSentToOutput(u.txn, eventNID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *roomRecentEventsUpdater) Commit() error {
|
func (u *roomRecentEventsUpdater) Commit() error {
|
||||||
|
|
|
@ -143,7 +143,11 @@ type RoomRecentEventsUpdater interface {
|
||||||
IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error)
|
IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error)
|
||||||
// Set the list of latest events for the room.
|
// Set the list of latest events for the room.
|
||||||
// This replaces the current list stored in the database with the given list
|
// This replaces the current list stored in the database with the given list
|
||||||
SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference) error
|
SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference, lastEventNIDSent EventNID) error
|
||||||
|
// Check if the event has already be written to the output logs.
|
||||||
|
HasEventBeenSent(eventNID EventNID) (bool, error)
|
||||||
|
// Mark the event as having been sent to the output logs.
|
||||||
|
MarkEventAsSent(eventNID EventNID) error
|
||||||
// Commit the transaction
|
// Commit the transaction
|
||||||
Commit() error
|
Commit() error
|
||||||
// Rollback the transaction.
|
// Rollback the transaction.
|
||||||
|
|
Loading…
Reference in New Issue