Ensure we only process one event at a time (#376)
parent
4bb862864c
commit
de6529d766
|
@ -69,6 +69,11 @@ type OutputRoomEventWriter interface {
|
||||||
WriteOutputEvents(roomID string, updates []api.OutputEvent) error
|
WriteOutputEvents(roomID string, updates []api.OutputEvent) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processRoomEvent can only be called once at a time
|
||||||
|
//
|
||||||
|
// TODO(#375): This should be rewritten to allow concurrent calls. The
|
||||||
|
// difficulty is in ensuring that we correctly annotate events with the correct
|
||||||
|
// state deltas when sending to kafka streams
|
||||||
func processRoomEvent(
|
func processRoomEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db RoomEventDatabase,
|
db RoomEventDatabase,
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -33,6 +34,8 @@ type RoomserverInputAPI struct {
|
||||||
// The kafkaesque topic to output new room events to.
|
// The kafkaesque topic to output new room events to.
|
||||||
// This is the name used in kafka to identify the stream to write events to.
|
// This is the name used in kafka to identify the stream to write events to.
|
||||||
OutputRoomEventTopic string
|
OutputRoomEventTopic string
|
||||||
|
// Protects calls to processRoomEvent
|
||||||
|
mutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOutputEvents implements OutputRoomEventWriter
|
// WriteOutputEvents implements OutputRoomEventWriter
|
||||||
|
@ -59,6 +62,10 @@ func (r *RoomserverInputAPI) InputRoomEvents(
|
||||||
response *api.InputRoomEventsResponse,
|
response *api.InputRoomEventsResponse,
|
||||||
) error {
|
) error {
|
||||||
for i := range request.InputRoomEvents {
|
for i := range request.InputRoomEvents {
|
||||||
|
// We lock as processRoomEvent can ony be called once at a time
|
||||||
|
r.mutex.Lock()
|
||||||
|
defer r.mutex.Unlock()
|
||||||
|
|
||||||
if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
|
if err := processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import (
|
||||||
// |
|
// |
|
||||||
// 7 <----- latest
|
// 7 <----- latest
|
||||||
//
|
//
|
||||||
|
// Can only be called once at a time
|
||||||
func updateLatestEvents(
|
func updateLatestEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
db RoomEventDatabase,
|
db RoomEventDatabase,
|
||||||
|
|
Loading…
Reference in New Issue