diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 82ece230..b8279a86 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -38,8 +38,7 @@ type Inputer struct { ServerName gomatrixserverlib.ServerName ACLs *acls.ServerACLs OutputRoomEventTopic string - - workers sync.Map // room ID -> *inputWorker + workers sync.Map // room ID -> *inputWorker } type inputTask struct { @@ -52,7 +51,7 @@ type inputTask struct { type inputWorker struct { r *Inputer running atomic.Bool - input chan *inputTask + input *fifoQueue } // Guarded by a CAS on w.running @@ -60,7 +59,11 @@ func (w *inputWorker) start() { defer w.running.Store(false) for { select { - case task := <-w.input: + case <-w.input.wait(): + task, ok := w.input.pop() + if !ok { + continue + } hooks.Run(hooks.KindNewEventReceived, task.event.Event) _, task.err = w.r.processRoomEvent(task.ctx, task.event) if task.err == nil { @@ -143,7 +146,7 @@ func (r *Inputer) InputRoomEvents( // room - the channel will be quite small as it's just pointer types. w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ r: r, - input: make(chan *inputTask, 32), + input: newFIFOQueue(), }) worker := w.(*inputWorker) @@ -160,7 +163,7 @@ func (r *Inputer) InputRoomEvents( if worker.running.CAS(false, true) { go worker.start() } - worker.input <- tasks[i] + worker.input.push(tasks[i]) } // Wait for all of the workers to return results about our tasks. diff --git a/roomserver/internal/input/input_fifo.go b/roomserver/internal/input/input_fifo.go new file mode 100644 index 00000000..694b1724 --- /dev/null +++ b/roomserver/internal/input/input_fifo.go @@ -0,0 +1,64 @@ +package input + +import ( + "sync" +) + +type fifoQueue struct { + tasks []*inputTask + count int + mutex sync.Mutex + notifs chan struct{} +} + +func newFIFOQueue() *fifoQueue { + q := &fifoQueue{ + notifs: make(chan struct{}, 1), + } + return q +} + +func (q *fifoQueue) push(frame *inputTask) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.tasks = append(q.tasks, frame) + q.count++ + select { + case q.notifs <- struct{}{}: + default: + } +} + +// pop returns the first item of the queue, if there is one. +// The second return value will indicate if a task was returned. +// You must check this value, even after calling wait(). +func (q *fifoQueue) pop() (*inputTask, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count == 0 { + return nil, false + } + frame := q.tasks[0] + q.tasks[0] = nil + q.tasks = q.tasks[1:] + q.count-- + if q.count == 0 { + // Force a GC of the underlying array, since it might have + // grown significantly if the queue was hammered for some reason + q.tasks = nil + } + return frame, true +} + +// wait returns a channel which can be used to detect when an +// item is waiting in the queue. +func (q *fifoQueue) wait() <-chan struct{} { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.count > 0 && len(q.notifs) == 0 { + ch := make(chan struct{}) + close(ch) + return ch + } + return q.notifs +}