From de6529d766b95baf0fc425dc166d6946a0b36edc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 Dec 2017 15:22:06 +0000 Subject: [PATCH] Ensure we only process one event at a time (#376) --- .../matrix-org/dendrite/roomserver/input/events.go | 5 +++++ .../matrix-org/dendrite/roomserver/input/input.go | 7 +++++++ .../matrix-org/dendrite/roomserver/input/latest_events.go | 1 + 3 files changed, 13 insertions(+) diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 91de6435..02cd4506 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -69,6 +69,11 @@ type OutputRoomEventWriter interface { WriteOutputEvents(roomID string, updates []api.OutputEvent) error } +// processRoomEvent can only be called once at a time +// +// TODO(#375): This should be rewritten to allow concurrent calls. The +// difficulty is in ensuring that we correctly annotate events with the correct +// state deltas when sending to kafka streams func processRoomEvent( ctx context.Context, db RoomEventDatabase, diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index 253ef9ff..4d5ba5cb 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "net/http" + "sync" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" @@ -33,6 +34,8 @@ type RoomserverInputAPI struct { // The kafkaesque topic to output new room events to. // This is the name used in kafka to identify the stream to write events to. OutputRoomEventTopic string + // Protects calls to processRoomEvent + mutex sync.Mutex } // WriteOutputEvents implements OutputRoomEventWriter @@ -59,6 +62,10 @@ func (r *RoomserverInputAPI) InputRoomEvents( response *api.InputRoomEventsResponse, ) error { for i := range request.InputRoomEvents { + // We lock as processRoomEvent can ony be called once at a time + r.mutex.Lock() + defer r.mutex.Unlock() + if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 2b82bcba..c2f06393 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -42,6 +42,7 @@ import ( // | // 7 <----- latest // +// Can only be called once at a time func updateLatestEvents( ctx context.Context, db RoomEventDatabase,