Read batches from incoming channels (#1067)

main
Neil Alexander 2020-05-27 12:16:53 +01:00 committed by GitHub
parent 406b47267e
commit 57841fc35e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 0 deletions

View File

@ -110,12 +110,26 @@ func (oq *destinationQueue) backgroundSend() {
// of the queue and they will all be added to transactions // of the queue and they will all be added to transactions
// in order. // in order.
oq.pendingPDUs = append(oq.pendingPDUs, pdu) oq.pendingPDUs = append(oq.pendingPDUs, pdu)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingPDUs) > 0 {
oq.pendingPDUs = append(oq.pendingPDUs, <-oq.incomingPDUs)
}
case edu := <-oq.incomingEDUs: case edu := <-oq.incomingEDUs:
// Likewise for EDUs, although we should probably not try // Likewise for EDUs, although we should probably not try
// too hard with some EDUs (like typing notifications) after // too hard with some EDUs (like typing notifications) after
// a certain amount of time has passed. // a certain amount of time has passed.
// TODO: think about EDU expiry some more // TODO: think about EDU expiry some more
oq.pendingEDUs = append(oq.pendingEDUs, edu) oq.pendingEDUs = append(oq.pendingEDUs, edu)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingEDUs) > 0 {
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
}
case invite := <-oq.incomingInvites: case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like // There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the // there is for transactions, so we put the invite onto the
@ -126,6 +140,13 @@ func (oq *destinationQueue) backgroundSend() {
[]*gomatrixserverlib.InviteV2Request{invite}, []*gomatrixserverlib.InviteV2Request{invite},
oq.pendingInvites..., oq.pendingInvites...,
) )
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingInvites) > 0 {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
}
case <-time.After(time.Second * 30): case <-time.After(time.Second * 30):
// The worker is idle so stop the goroutine. It'll // The worker is idle so stop the goroutine. It'll
// get restarted automatically the next time we // get restarted automatically the next time we