diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go index c43d50a4..9ea9d088 100644 --- a/syncapi/streams/stream_devicelist.go +++ b/syncapi/streams/stream_devicelist.go @@ -19,7 +19,7 @@ func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.LogPosition { - return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx)) + return p.LatestPosition(ctx) } func (p *DeviceListStreamProvider) IncrementalSync( diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index d6d7ff44..ae38dc30 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -2,18 +2,54 @@ package streams import ( "context" + "sync" + "time" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" ) +// The max number of per-room goroutines to have running. +// Too high and this will consume lots of CPU, too low and complete +// sync responses will take longer to process. +const PDU_STREAM_WORKERS = 256 + +// The maximum number of tasks that can be queued in total before +// backpressure will build up and the rests will start to block. +const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8 + type PDUStreamProvider struct { StreamProvider + + tasks chan func() + workers atomic.Int32 +} + +func (p *PDUStreamProvider) worker() { + defer p.workers.Dec() + for { + select { + case f := <-p.tasks: + f() + case <-time.After(time.Second * 10): + return + } + } +} + +func (p *PDUStreamProvider) queue(f func()) { + if p.workers.Load() < PDU_STREAM_WORKERS { + p.workers.Inc() + go p.worker() + } + p.tasks <- f } func (p *PDUStreamProvider) Setup() { p.StreamProvider.Setup() + p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE) p.latestMutex.Lock() defer p.latestMutex.Unlock() @@ -52,19 +88,32 @@ func (p *PDUStreamProvider) CompleteSync( eventFilter := req.Filter.Room.Timeline // Build up a /sync response. Add joined rooms. - for _, roomID := range joinedRoomIDs { - var jr *types.JoinResponse - jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.Device, - ) - if err != nil { - req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return from - } - req.Response.Rooms.Join[roomID] = *jr - req.Rooms[roomID] = gomatrixserverlib.Join + var reqMutex sync.Mutex + var reqWaitGroup sync.WaitGroup + reqWaitGroup.Add(len(joinedRoomIDs)) + for _, room := range joinedRoomIDs { + roomID := room + p.queue(func() { + defer reqWaitGroup.Done() + + var jr *types.JoinResponse + jr, err = p.getJoinResponseForCompleteSync( + ctx, roomID, r, &stateFilter, &eventFilter, req.Device, + ) + if err != nil { + req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") + return + } + + reqMutex.Lock() + defer reqMutex.Unlock() + req.Response.Rooms.Join[roomID] = *jr + req.Rooms[roomID] = gomatrixserverlib.Join + }) } + reqWaitGroup.Wait() + // Add peeked rooms. peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) if err != nil {