From 8e9580852d32f870693103c424a71a30c811fcca Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 7 Jul 2020 16:36:10 +0100 Subject: [PATCH] bugfix: continue sending PDUs if ones are added whilst sending another PDU (#1187) * Add a bit more logging to the fedsender * bugfix: continue sending PDUs if ones are added whilst sending another PDU Without this, the queue goes back to sleep on `<-oq.notifyPDUs` which won't fire because `pendingPDUs` is already > 0. This should fix a flakey sytest. * Break if no txn is sent * Tweak federation sender wake-ups * Update comments * Remove break or that'll kill the parent loop Co-authored-by: Neil Alexander --- federationsender/queue/destinationqueue.go | 14 ++++++++++++++ federationsender/queue/queue.go | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 70e50d2e..845600c9 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -65,6 +65,7 @@ type destinationQueue struct { func (oq *destinationQueue) sendEvent(nid int64) { if oq.statistics.Blacklisted() { // If the destination is blacklisted then drop the event. + log.Infof("%s is blacklisted; dropping event", oq.destination) return } oq.wakeQueueIfNeeded() @@ -214,6 +215,7 @@ func (oq *destinationQueue) backgroundSend() { // backoff duration to complete first, or until explicitly // told to retry. if backoff, duration := oq.statistics.BackoffDuration(); backoff { + log.WithField("duration", duration).Infof("Backing off %s", oq.destination) oq.backingOff.Store(true) select { case <-time.After(duration): @@ -271,6 +273,17 @@ func (oq *destinationQueue) backgroundSend() { oq.cleanPendingInvites() } } + + // If something else has come along while we were busy sending + // the previous transaction then we don't want the next loop + // iteration to sleep. Send a message if someone else hasn't + // already sent a wake-up. + if oq.pendingPDUs.Load() > 0 { + select { + case oq.notifyPDUs <- true: + default: + } + } } } @@ -336,6 +349,7 @@ func (oq *destinationQueue) nextTransaction( // If we didn't get anything from the database and there are no // pending EDUs then there's nothing to do - stop here. if len(pdus) == 0 && len(pendingEDUs) == 0 { + log.Warnf("no pdus/edus for nextTransaction for destination %q", oq.destination) return false, nil } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 6c7fca54..2288689e 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -93,7 +93,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d statistics: oqs.statistics.ForServer(destination), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), - notifyPDUs: make(chan bool, 128), + notifyPDUs: make(chan bool, 1), interruptBackoff: make(chan bool), signing: oqs.signing, }