bugfix: continue sending PDUs if ones are added whilst sending another PDU (#1187)

* Add a bit more logging to the fedsender

* bugfix: continue sending PDUs if ones are added whilst sending another PDU

Without this, the queue goes back to sleep on `<-oq.notifyPDUs` which won't
fire because `pendingPDUs` is already > 0. This should fix a flakey sytest.

* Break if no txn is sent

* Tweak federation sender wake-ups

* Update comments

* Remove break or that'll kill the parent loop

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
main
Kegsay 2020-07-07 16:36:10 +01:00 committed by GitHub
parent d03f6ac725
commit 8e9580852d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 1 deletions

View File

@ -65,6 +65,7 @@ type destinationQueue struct {
func (oq *destinationQueue) sendEvent(nid int64) { func (oq *destinationQueue) sendEvent(nid int64) {
if oq.statistics.Blacklisted() { if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event. // If the destination is blacklisted then drop the event.
log.Infof("%s is blacklisted; dropping event", oq.destination)
return return
} }
oq.wakeQueueIfNeeded() oq.wakeQueueIfNeeded()
@ -214,6 +215,7 @@ func (oq *destinationQueue) backgroundSend() {
// backoff duration to complete first, or until explicitly // backoff duration to complete first, or until explicitly
// told to retry. // told to retry.
if backoff, duration := oq.statistics.BackoffDuration(); backoff { if backoff, duration := oq.statistics.BackoffDuration(); backoff {
log.WithField("duration", duration).Infof("Backing off %s", oq.destination)
oq.backingOff.Store(true) oq.backingOff.Store(true)
select { select {
case <-time.After(duration): case <-time.After(duration):
@ -271,6 +273,17 @@ func (oq *destinationQueue) backgroundSend() {
oq.cleanPendingInvites() oq.cleanPendingInvites()
} }
} }
// If something else has come along while we were busy sending
// the previous transaction then we don't want the next loop
// iteration to sleep. Send a message if someone else hasn't
// already sent a wake-up.
if oq.pendingPDUs.Load() > 0 {
select {
case oq.notifyPDUs <- true:
default:
}
}
} }
} }
@ -336,6 +349,7 @@ func (oq *destinationQueue) nextTransaction(
// If we didn't get anything from the database and there are no // If we didn't get anything from the database and there are no
// pending EDUs then there's nothing to do - stop here. // pending EDUs then there's nothing to do - stop here.
if len(pdus) == 0 && len(pendingEDUs) == 0 { if len(pdus) == 0 && len(pendingEDUs) == 0 {
log.Warnf("no pdus/edus for nextTransaction for destination %q", oq.destination)
return false, nil return false, nil
} }

View File

@ -93,7 +93,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
statistics: oqs.statistics.ForServer(destination), statistics: oqs.statistics.ForServer(destination),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
notifyPDUs: make(chan bool, 128), notifyPDUs: make(chan bool, 1),
interruptBackoff: make(chan bool), interruptBackoff: make(chan bool),
signing: oqs.signing, signing: oqs.signing,
} }