Wake up destination queues more aggressively (#1183)
* Wake up destination queues more aggressively * We don't really need Ch here do we
This commit is contained in:
parent
ce2a285887
commit
46dbc46f84
2 changed files with 29 additions and 50 deletions
|
@ -55,32 +55,8 @@ type destinationQueue struct {
|
|||
pendingPDUs atomic.Int64 // how many PDUs are waiting to be sent
|
||||
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
|
||||
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
|
||||
wakeServerCh chan bool // interrupts idle wait
|
||||
retryServerCh chan bool // interrupts backoff
|
||||
}
|
||||
|
||||
// retry will clear the blacklist state and attempt to send built up events to the server,
|
||||
// resetting and interrupting any backoff timers.
|
||||
func (oq *destinationQueue) retry() {
|
||||
// TODO: We don't send all events in the case where the server has been blacklisted as we
|
||||
// drop events instead then. This means we will send the oldest N events (chan size, currently 128)
|
||||
// and then skip ahead a lot which feels non-ideal but equally we can't persist thousands of events
|
||||
// in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database
|
||||
// so we can send a lot of events.
|
||||
//
|
||||
// Interrupt the backoff. If the federation request that happens as a result of this is successful
|
||||
// then the counters will be reset there and the backoff will cancel. If the federation request
|
||||
// fails then we will retry at the current backoff interval, so as to prevent us from spamming
|
||||
// homeservers which are behaving badly.
|
||||
// We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel
|
||||
// as it is unbuffered.
|
||||
if oq.backingOff.CAS(true, false) {
|
||||
oq.retryServerCh <- true
|
||||
}
|
||||
if !oq.running.Load() {
|
||||
log.Infof("Restarting queue for %s", oq.destination)
|
||||
go oq.backgroundSend()
|
||||
}
|
||||
notifyPDUs chan bool // interrupts idle wait for PDUs
|
||||
interruptBackoff chan bool // interrupts backoff
|
||||
}
|
||||
|
||||
// Send event adds the event to the pending queue for the destination.
|
||||
|
@ -122,7 +98,7 @@ func (oq *destinationQueue) sendEvent(nid int64) {
|
|||
// wake up if it's asleep. The return to the Add function will only
|
||||
// be 1 if the previous value was 0, e.g. nothing was waiting before.
|
||||
if oq.pendingPDUs.Add(1) == 1 {
|
||||
oq.wakeServerCh <- true
|
||||
oq.notifyPDUs <- true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,7 +126,16 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
|
|||
oq.incomingInvites <- ev
|
||||
}
|
||||
|
||||
// wakeQueueIfNeeded will wake up the destination queue if it is
|
||||
// not already running. If it is running but it is backing off
|
||||
// then we will interrupt the backoff, causing any federation
|
||||
// requests to retry.
|
||||
func (oq *destinationQueue) wakeQueueIfNeeded() {
|
||||
// If we are backing off then interrupt the backoff.
|
||||
if oq.backingOff.CAS(true, false) {
|
||||
oq.interruptBackoff <- true
|
||||
}
|
||||
// If we aren't running then wake up the queue.
|
||||
if !oq.running.Load() {
|
||||
// Look up how many events are pending in this queue. We need
|
||||
// to do this so that the queue thinks it has work to do.
|
||||
|
@ -165,7 +150,7 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
|
|||
log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination)
|
||||
}
|
||||
if count > 0 {
|
||||
oq.wakeServerCh <- true
|
||||
oq.notifyPDUs <- true
|
||||
}
|
||||
// Then start the queue.
|
||||
go oq.backgroundSend()
|
||||
|
@ -186,7 +171,7 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// If we have nothing to do then wait either for incoming events, or
|
||||
// until we hit an idle timeout.
|
||||
select {
|
||||
case <-oq.wakeServerCh:
|
||||
case <-oq.notifyPDUs:
|
||||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
case edu := <-oq.incomingEDUs:
|
||||
|
@ -232,7 +217,7 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
oq.backingOff.Store(true)
|
||||
select {
|
||||
case <-time.After(duration):
|
||||
case <-oq.retryServerCh:
|
||||
case <-oq.interruptBackoff:
|
||||
}
|
||||
oq.backingOff.Store(false)
|
||||
}
|
||||
|
@ -255,7 +240,7 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// PDUs waiting to be sent. By sending a message into the wake chan,
|
||||
// the next loop iteration will try processing these PDUs again,
|
||||
// subject to the backoff.
|
||||
oq.wakeServerCh <- true
|
||||
oq.notifyPDUs <- true
|
||||
}
|
||||
} else if transaction {
|
||||
// If we successfully sent the transaction then clear out
|
||||
|
|
|
@ -79,29 +79,23 @@ type SigningInfo struct {
|
|||
PrivateKey ed25519.PrivateKey
|
||||
}
|
||||
|
||||
func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue {
|
||||
oqs.queuesMutex.Lock()
|
||||
defer oqs.queuesMutex.Unlock()
|
||||
return oqs.queues[destination]
|
||||
}
|
||||
|
||||
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
|
||||
oqs.queuesMutex.Lock()
|
||||
defer oqs.queuesMutex.Unlock()
|
||||
oq := oqs.queues[destination]
|
||||
if oq == nil {
|
||||
oq = &destinationQueue{
|
||||
db: oqs.db,
|
||||
rsAPI: oqs.rsAPI,
|
||||
origin: oqs.origin,
|
||||
destination: destination,
|
||||
client: oqs.client,
|
||||
statistics: oqs.statistics.ForServer(destination),
|
||||
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
|
||||
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
|
||||
wakeServerCh: make(chan bool, 128),
|
||||
retryServerCh: make(chan bool),
|
||||
signing: oqs.signing,
|
||||
db: oqs.db,
|
||||
rsAPI: oqs.rsAPI,
|
||||
origin: oqs.origin,
|
||||
destination: destination,
|
||||
client: oqs.client,
|
||||
statistics: oqs.statistics.ForServer(destination),
|
||||
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
|
||||
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
|
||||
notifyPDUs: make(chan bool, 128),
|
||||
interruptBackoff: make(chan bool),
|
||||
signing: oqs.signing,
|
||||
}
|
||||
oqs.queues[destination] = oq
|
||||
}
|
||||
|
@ -211,11 +205,11 @@ func (oqs *OutgoingQueues) SendEDU(
|
|||
|
||||
// RetryServer attempts to resend events to the given server if we had given up.
|
||||
func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
|
||||
q := oqs.getQueueIfExists(srv)
|
||||
q := oqs.getQueue(srv)
|
||||
if q == nil {
|
||||
return
|
||||
}
|
||||
q.retry()
|
||||
q.wakeQueueIfNeeded()
|
||||
}
|
||||
|
||||
// filterAndDedupeDests removes our own server from the list of destinations
|
||||
|
|
Loading…
Reference in a new issue