Persistent federation sender queues (PDUs) (#1173)
* Initial work on persistent queues * Update index for event ID and server name * Put things into database (postgres for now) * Duplicate postgres code into sqlite for now just to stop build errors, will fix SQLite soon * Fix table name * Fix index * Fix table name * Use RETURNING because LastInsertID is not supported by postgres * Use functions * Marshal headered event * Don't error on now rows * Don't block if there are PDUs waiting * Try to tidy up JSON * Debug logging * Fix query, use transactions in postgres * Clean up * Rehydrate more opportunistically * Fix SQLite * remove unused types * Review comments * Shuffle things around a bit * Clean up transaction properly * Don't send empty transactions * Reduce unnecessary retries * Count PDUs to make more resilient * Don't stop when there is work to be done * Try to limit wakeups * well this is tedious * Fix race in incomplete transactions * Thread safety on transaction ID/countmain
parent
6f49758b90
commit
42dd962425
|
@ -130,8 +130,9 @@ func main() {
|
||||||
cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
|
cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
|
||||||
cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
|
cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
|
||||||
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
|
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
|
||||||
cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName))
|
cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicrooms.db", *instanceName))
|
||||||
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
|
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
|
||||||
|
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
|
||||||
if err = cfg.Derive(); err != nil {
|
if err = cfg.Derive(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,8 @@ func main() {
|
||||||
cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
|
cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
|
||||||
cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
|
cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
|
||||||
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
|
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
|
||||||
cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName))
|
cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicrooms.db", *instanceName))
|
||||||
|
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
|
||||||
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
|
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
|
||||||
if err = cfg.Derive(); err != nil {
|
if err = cfg.Derive(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
|
@ -50,7 +50,8 @@ func NewInternalAPI(
|
||||||
|
|
||||||
statistics := &types.Statistics{}
|
statistics := &types.Statistics{}
|
||||||
queues := queue.NewOutgoingQueues(
|
queues := queue.NewOutgoingQueues(
|
||||||
base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{
|
federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, statistics,
|
||||||
|
&queue.SigningInfo{
|
||||||
KeyID: base.Cfg.Matrix.KeyID,
|
KeyID: base.Cfg.Matrix.KeyID,
|
||||||
PrivateKey: base.Cfg.Matrix.PrivateKey,
|
PrivateKey: base.Cfg.Matrix.PrivateKey,
|
||||||
ServerName: base.Cfg.Matrix.ServerName,
|
ServerName: base.Cfg.Matrix.ServerName,
|
||||||
|
|
|
@ -18,8 +18,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/federationsender/types"
|
"github.com/matrix-org/dendrite/federationsender/types"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
|
@ -29,11 +31,14 @@ import (
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const maxPDUsPerTransaction = 50
|
||||||
|
|
||||||
// destinationQueue is a queue of events for a single destination.
|
// destinationQueue is a queue of events for a single destination.
|
||||||
// It is responsible for sending the events to the destination and
|
// It is responsible for sending the events to the destination and
|
||||||
// ensures that only one request is in flight to a given destination
|
// ensures that only one request is in flight to a given destination
|
||||||
// at a time.
|
// at a time.
|
||||||
type destinationQueue struct {
|
type destinationQueue struct {
|
||||||
|
db storage.Database
|
||||||
signing *SigningInfo
|
signing *SigningInfo
|
||||||
rsAPI api.RoomserverInternalAPI
|
rsAPI api.RoomserverInternalAPI
|
||||||
client *gomatrixserverlib.FederationClient // federation client
|
client *gomatrixserverlib.FederationClient // federation client
|
||||||
|
@ -42,13 +47,15 @@ type destinationQueue struct {
|
||||||
running atomic.Bool // is the queue worker running?
|
running atomic.Bool // is the queue worker running?
|
||||||
backingOff atomic.Bool // true if we're backing off
|
backingOff atomic.Bool // true if we're backing off
|
||||||
statistics *types.ServerStatistics // statistics about this remote server
|
statistics *types.ServerStatistics // statistics about this remote server
|
||||||
incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
|
|
||||||
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
|
|
||||||
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
|
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
|
||||||
lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID
|
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
|
||||||
pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend
|
transactionIDMutex sync.Mutex // protects transactionID
|
||||||
|
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||||
|
transactionCount atomic.Int32 // how many events in this transaction so far
|
||||||
|
pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
|
||||||
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
|
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
|
||||||
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
|
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
|
||||||
|
wakeServerCh chan bool // interrupts idle wait
|
||||||
retryServerCh chan bool // interrupts backoff
|
retryServerCh chan bool // interrupts backoff
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,15 +86,47 @@ func (oq *destinationQueue) retry() {
|
||||||
// Send event adds the event to the pending queue for the destination.
|
// Send event adds the event to the pending queue for the destination.
|
||||||
// If the queue is empty then it starts a background goroutine to
|
// If the queue is empty then it starts a background goroutine to
|
||||||
// start sending events to that destination.
|
// start sending events to that destination.
|
||||||
func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
func (oq *destinationQueue) sendEvent(nid int64) {
|
||||||
if oq.statistics.Blacklisted() {
|
if oq.statistics.Blacklisted() {
|
||||||
// If the destination is blacklisted then drop the event.
|
// If the destination is blacklisted then drop the event.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Create a transaction ID. We'll either do this if we don't have
|
||||||
|
// one made up yet, or if we've exceeded the number of maximum
|
||||||
|
// events allowed in a single tranaction. We'll reset the counter
|
||||||
|
// when we do.
|
||||||
|
oq.transactionIDMutex.Lock()
|
||||||
|
if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction {
|
||||||
|
now := gomatrixserverlib.AsTimestamp(time.Now())
|
||||||
|
oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
|
||||||
|
oq.transactionCount.Store(0)
|
||||||
|
}
|
||||||
|
oq.transactionIDMutex.Unlock()
|
||||||
|
// Create a database entry that associates the given PDU NID with
|
||||||
|
// this destination queue. We'll then be able to retrieve the PDU
|
||||||
|
// later.
|
||||||
|
if err := oq.db.AssociatePDUWithDestination(
|
||||||
|
context.TODO(),
|
||||||
|
oq.transactionID, // the current transaction ID
|
||||||
|
oq.destination, // the destination server name
|
||||||
|
[]int64{nid}, // NID from federationsender_queue_json table
|
||||||
|
); err != nil {
|
||||||
|
log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// We've successfully added a PDU to the transaction so increase
|
||||||
|
// the counter.
|
||||||
|
oq.transactionCount.Add(1)
|
||||||
|
// If the queue isn't running at this point then start it.
|
||||||
if !oq.running.Load() {
|
if !oq.running.Load() {
|
||||||
go oq.backgroundSend()
|
go oq.backgroundSend()
|
||||||
}
|
}
|
||||||
oq.incomingPDUs <- ev
|
// Signal that we've sent a new PDU. This will cause the queue to
|
||||||
|
// wake up if it's asleep. The return to the Add function will only
|
||||||
|
// be 1 if the previous value was 0, e.g. nothing was waiting before.
|
||||||
|
if oq.pendingPDUs.Add(1) == 1 {
|
||||||
|
oq.wakeServerCh <- true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendEDU adds the EDU event to the pending queue for the destination.
|
// sendEDU adds the EDU event to the pending queue for the destination.
|
||||||
|
@ -129,26 +168,17 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
defer oq.running.Store(false)
|
defer oq.running.Store(false)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Wait either for incoming events, or until we hit an
|
// If we have nothing to do then wait either for incoming events, or
|
||||||
// idle timeout.
|
// until we hit an idle timeout.
|
||||||
|
if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 {
|
||||||
select {
|
select {
|
||||||
case pdu := <-oq.incomingPDUs:
|
case <-oq.wakeServerCh:
|
||||||
// Ordering of PDUs is important so we add them to the end
|
// We were woken up because there are new PDUs waiting in the
|
||||||
// of the queue and they will all be added to transactions
|
// database.
|
||||||
// in order.
|
|
||||||
oq.pendingPDUs = append(oq.pendingPDUs, pdu)
|
|
||||||
// If there are any more things waiting in the channel queue
|
|
||||||
// then read them. This is safe because we guarantee only
|
|
||||||
// having one goroutine per destination queue, so the channel
|
|
||||||
// isn't being consumed anywhere else.
|
|
||||||
for len(oq.incomingPDUs) > 0 {
|
|
||||||
oq.pendingPDUs = append(oq.pendingPDUs, <-oq.incomingPDUs)
|
|
||||||
}
|
|
||||||
case edu := <-oq.incomingEDUs:
|
case edu := <-oq.incomingEDUs:
|
||||||
// Likewise for EDUs, although we should probably not try
|
// EDUs are handled in-memory for now. We will try to keep
|
||||||
// too hard with some EDUs (like typing notifications) after
|
// the ordering intact.
|
||||||
// a certain amount of time has passed.
|
// TODO: Certain EDU types need persistence, e.g. send-to-device
|
||||||
// TODO: think about EDU expiry some more
|
|
||||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
||||||
// If there are any more things waiting in the channel queue
|
// If there are any more things waiting in the channel queue
|
||||||
// then read them. This is safe because we guarantee only
|
// then read them. This is safe because we guarantee only
|
||||||
|
@ -175,11 +205,12 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
||||||
}
|
}
|
||||||
case <-time.After(time.Second * 30):
|
case <-time.After(time.Second * 30):
|
||||||
// The worker is idle so stop the goroutine. It'll
|
// The worker is idle so stop the goroutine. It'll get
|
||||||
// get restarted automatically the next time we
|
// restarted automatically the next time we have an event to
|
||||||
// get an event.
|
// send.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If we are backing off this server then wait for the
|
// If we are backing off this server then wait for the
|
||||||
// backoff duration to complete first, or until explicitly
|
// backoff duration to complete first, or until explicitly
|
||||||
|
@ -193,47 +224,31 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
oq.backingOff.Store(false)
|
oq.backingOff.Store(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// How many things do we have waiting?
|
|
||||||
numPDUs := len(oq.pendingPDUs)
|
|
||||||
numEDUs := len(oq.pendingEDUs)
|
|
||||||
numInvites := len(oq.pendingInvites)
|
|
||||||
|
|
||||||
// If we have pending PDUs or EDUs then construct a transaction.
|
// If we have pending PDUs or EDUs then construct a transaction.
|
||||||
if numPDUs > 0 || numEDUs > 0 {
|
if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 {
|
||||||
// Try sending the next transaction and see what happens.
|
// Try sending the next transaction and see what happens.
|
||||||
transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount())
|
transaction, terr := oq.nextTransaction(oq.pendingEDUs)
|
||||||
if terr != nil {
|
if terr != nil {
|
||||||
// We failed to send the transaction.
|
// We failed to send the transaction.
|
||||||
if giveUp := oq.statistics.Failure(); giveUp {
|
if giveUp := oq.statistics.Failure(); giveUp {
|
||||||
// It's been suggested that we should give up because
|
// It's been suggested that we should give up because the backoff
|
||||||
// the backoff has exceeded a maximum allowable value.
|
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||||
|
// buffers at this point. The PDU clean-up is already on a defer.
|
||||||
|
oq.cleanPendingEDUs()
|
||||||
|
oq.cleanPendingInvites()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else if transaction {
|
} else if transaction {
|
||||||
// If we successfully sent the transaction then clear out
|
// If we successfully sent the transaction then clear out
|
||||||
// the pending events and EDUs.
|
// the pending events and EDUs, and wipe our transaction ID.
|
||||||
oq.statistics.Success()
|
oq.statistics.Success()
|
||||||
// Reallocate so that the underlying arrays can be GC'd, as
|
// Clean up the in-memory buffers.
|
||||||
// opposed to growing forever.
|
oq.cleanPendingEDUs()
|
||||||
for i := 0; i < numPDUs; i++ {
|
|
||||||
oq.pendingPDUs[i] = nil
|
|
||||||
}
|
|
||||||
for i := 0; i < numEDUs; i++ {
|
|
||||||
oq.pendingEDUs[i] = nil
|
|
||||||
}
|
|
||||||
oq.pendingPDUs = append(
|
|
||||||
[]*gomatrixserverlib.HeaderedEvent{},
|
|
||||||
oq.pendingPDUs[numPDUs:]...,
|
|
||||||
)
|
|
||||||
oq.pendingEDUs = append(
|
|
||||||
[]*gomatrixserverlib.EDU{},
|
|
||||||
oq.pendingEDUs[numEDUs:]...,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try sending the next invite and see what happens.
|
// Try sending the next invite and see what happens.
|
||||||
if numInvites > 0 {
|
if len(oq.pendingInvites) > 0 {
|
||||||
sent, ierr := oq.nextInvites(oq.pendingInvites)
|
sent, ierr := oq.nextInvites(oq.pendingInvites)
|
||||||
if ierr != nil {
|
if ierr != nil {
|
||||||
// We failed to send the transaction so increase the
|
// We failed to send the transaction so increase the
|
||||||
|
@ -249,59 +264,117 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
oq.statistics.Success()
|
oq.statistics.Success()
|
||||||
// Reallocate so that the underlying array can be GC'd, as
|
// Reallocate so that the underlying array can be GC'd, as
|
||||||
// opposed to growing forever.
|
// opposed to growing forever.
|
||||||
oq.pendingInvites = append(
|
oq.cleanPendingInvites()
|
||||||
[]*gomatrixserverlib.InviteV2Request{},
|
|
||||||
oq.pendingInvites[sent:]...,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanPendingEDUs cleans out the pending EDU buffer, removing
|
||||||
|
// all references so that the underlying objects can be GC'd.
|
||||||
|
func (oq *destinationQueue) cleanPendingEDUs() {
|
||||||
|
for i := 0; i < len(oq.pendingEDUs); i++ {
|
||||||
|
oq.pendingEDUs[i] = nil
|
||||||
|
}
|
||||||
|
oq.pendingEDUs = []*gomatrixserverlib.EDU{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanPendingInvites cleans out the pending invite buffer,
|
||||||
|
// removing all references so that the underlying objects can
|
||||||
|
// be GC'd.
|
||||||
|
func (oq *destinationQueue) cleanPendingInvites() {
|
||||||
|
for i := 0; i < len(oq.pendingInvites); i++ {
|
||||||
|
oq.pendingInvites[i] = nil
|
||||||
|
}
|
||||||
|
oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{}
|
||||||
|
}
|
||||||
|
|
||||||
// nextTransaction creates a new transaction from the pending event
|
// nextTransaction creates a new transaction from the pending event
|
||||||
// queue and sends it. Returns true if a transaction was sent or
|
// queue and sends it. Returns true if a transaction was sent or
|
||||||
// false otherwise.
|
// false otherwise.
|
||||||
func (oq *destinationQueue) nextTransaction(
|
func (oq *destinationQueue) nextTransaction(
|
||||||
pendingPDUs []*gomatrixserverlib.HeaderedEvent,
|
|
||||||
pendingEDUs []*gomatrixserverlib.EDU,
|
pendingEDUs []*gomatrixserverlib.EDU,
|
||||||
sentCounter uint32,
|
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
|
// Before we do anything, we need to roll over the transaction
|
||||||
|
// ID that is being used to coalesce events into the next TX.
|
||||||
|
// Otherwise it's possible that we'll pick up an incomplete
|
||||||
|
// transaction and end up nuking the rest of the events at the
|
||||||
|
// cleanup stage.
|
||||||
|
oq.transactionIDMutex.Lock()
|
||||||
|
oq.transactionID = ""
|
||||||
|
oq.transactionIDMutex.Unlock()
|
||||||
|
oq.transactionCount.Store(0)
|
||||||
|
|
||||||
|
// Create the transaction.
|
||||||
t := gomatrixserverlib.Transaction{
|
t := gomatrixserverlib.Transaction{
|
||||||
PDUs: []json.RawMessage{},
|
PDUs: []json.RawMessage{},
|
||||||
EDUs: []gomatrixserverlib.EDU{},
|
EDUs: []gomatrixserverlib.EDU{},
|
||||||
}
|
}
|
||||||
now := gomatrixserverlib.AsTimestamp(time.Now())
|
|
||||||
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, sentCounter))
|
|
||||||
t.Origin = oq.origin
|
t.Origin = oq.origin
|
||||||
t.Destination = oq.destination
|
t.Destination = oq.destination
|
||||||
t.OriginServerTS = now
|
t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
|
||||||
t.PreviousIDs = oq.lastTransactionIDs
|
|
||||||
if t.PreviousIDs == nil {
|
// Ask the database for any pending PDUs from the next transaction.
|
||||||
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
|
// maxPDUsPerTransaction is an upper limit but we probably won't
|
||||||
|
// actually retrieve that many events.
|
||||||
|
txid, pdus, err := oq.db.GetNextTransactionPDUs(
|
||||||
|
context.TODO(), // context
|
||||||
|
oq.destination, // server name
|
||||||
|
maxPDUsPerTransaction, // max events to retrieve
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
|
||||||
|
return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
|
// If we didn't get anything from the database and there are no
|
||||||
|
// pending EDUs then there's nothing to do - stop here.
|
||||||
|
if len(pdus) == 0 && len(pendingEDUs) == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
for _, pdu := range pendingPDUs {
|
// Pick out the transaction ID from the database. If we didn't
|
||||||
|
// get a transaction ID (i.e. because there are no PDUs but only
|
||||||
|
// EDUs) then generate a transaction ID.
|
||||||
|
t.TransactionID = txid
|
||||||
|
if t.TransactionID == "" {
|
||||||
|
now := gomatrixserverlib.AsTimestamp(time.Now())
|
||||||
|
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go through PDUs that we retrieved from the database, if any,
|
||||||
|
// and add them into the transaction.
|
||||||
|
for _, pdu := range pdus {
|
||||||
// Append the JSON of the event, since this is a json.RawMessage type in the
|
// Append the JSON of the event, since this is a json.RawMessage type in the
|
||||||
// gomatrixserverlib.Transaction struct
|
// gomatrixserverlib.Transaction struct
|
||||||
t.PDUs = append(t.PDUs, (*pdu).JSON())
|
t.PDUs = append(t.PDUs, (*pdu).JSON())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Do the same for pending EDUS in the queue.
|
||||||
for _, edu := range pendingEDUs {
|
for _, edu := range pendingEDUs {
|
||||||
t.EDUs = append(t.EDUs, *edu)
|
t.EDUs = append(t.EDUs, *edu)
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
|
logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
|
||||||
|
|
||||||
|
// Try to send the transaction to the destination server.
|
||||||
// TODO: we should check for 500-ish fails vs 400-ish here,
|
// TODO: we should check for 500-ish fails vs 400-ish here,
|
||||||
// since we shouldn't queue things indefinitely in response
|
// since we shouldn't queue things indefinitely in response
|
||||||
// to a 400-ish error
|
// to a 400-ish error
|
||||||
_, err := oq.client.SendTransaction(context.TODO(), t)
|
_, err = oq.client.SendTransaction(context.TODO(), t)
|
||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
case nil:
|
case nil:
|
||||||
// No error was returned so the transaction looks to have
|
// No error was returned so the transaction looks to have
|
||||||
// been successfully sent.
|
// been successfully sent.
|
||||||
|
oq.pendingPDUs.Sub(int32(len(t.PDUs)))
|
||||||
|
// Clean up the transaction in the database.
|
||||||
|
if err = oq.db.CleanTransactionPDUs(
|
||||||
|
context.TODO(),
|
||||||
|
t.Destination,
|
||||||
|
t.TransactionID,
|
||||||
|
); err != nil {
|
||||||
|
log.WithError(err).Errorf("failed to clean transaction %q for server %q", t.TransactionID, t.Destination)
|
||||||
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
case gomatrix.HTTPError:
|
case gomatrix.HTTPError:
|
||||||
// We received a HTTP error back. In this instance we only
|
// We received a HTTP error back. In this instance we only
|
||||||
|
|
|
@ -15,10 +15,13 @@
|
||||||
package queue
|
package queue
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/federationsender/types"
|
"github.com/matrix-org/dendrite/federationsender/types"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -29,6 +32,7 @@ import (
|
||||||
// OutgoingQueues is a collection of queues for sending transactions to other
|
// OutgoingQueues is a collection of queues for sending transactions to other
|
||||||
// matrix servers
|
// matrix servers
|
||||||
type OutgoingQueues struct {
|
type OutgoingQueues struct {
|
||||||
|
db storage.Database
|
||||||
rsAPI api.RoomserverInternalAPI
|
rsAPI api.RoomserverInternalAPI
|
||||||
origin gomatrixserverlib.ServerName
|
origin gomatrixserverlib.ServerName
|
||||||
client *gomatrixserverlib.FederationClient
|
client *gomatrixserverlib.FederationClient
|
||||||
|
@ -40,6 +44,7 @@ type OutgoingQueues struct {
|
||||||
|
|
||||||
// NewOutgoingQueues makes a new OutgoingQueues
|
// NewOutgoingQueues makes a new OutgoingQueues
|
||||||
func NewOutgoingQueues(
|
func NewOutgoingQueues(
|
||||||
|
db storage.Database,
|
||||||
origin gomatrixserverlib.ServerName,
|
origin gomatrixserverlib.ServerName,
|
||||||
client *gomatrixserverlib.FederationClient,
|
client *gomatrixserverlib.FederationClient,
|
||||||
rsAPI api.RoomserverInternalAPI,
|
rsAPI api.RoomserverInternalAPI,
|
||||||
|
@ -47,6 +52,7 @@ func NewOutgoingQueues(
|
||||||
signing *SigningInfo,
|
signing *SigningInfo,
|
||||||
) *OutgoingQueues {
|
) *OutgoingQueues {
|
||||||
return &OutgoingQueues{
|
return &OutgoingQueues{
|
||||||
|
db: db,
|
||||||
rsAPI: rsAPI,
|
rsAPI: rsAPI,
|
||||||
origin: origin,
|
origin: origin,
|
||||||
client: client,
|
client: client,
|
||||||
|
@ -76,14 +82,15 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
||||||
oq := oqs.queues[destination]
|
oq := oqs.queues[destination]
|
||||||
if oq == nil {
|
if oq == nil {
|
||||||
oq = &destinationQueue{
|
oq = &destinationQueue{
|
||||||
|
db: oqs.db,
|
||||||
rsAPI: oqs.rsAPI,
|
rsAPI: oqs.rsAPI,
|
||||||
origin: oqs.origin,
|
origin: oqs.origin,
|
||||||
destination: destination,
|
destination: destination,
|
||||||
client: oqs.client,
|
client: oqs.client,
|
||||||
statistics: oqs.statistics.ForServer(destination),
|
statistics: oqs.statistics.ForServer(destination),
|
||||||
incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128),
|
|
||||||
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
|
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
|
||||||
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
|
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
|
||||||
|
wakeServerCh: make(chan bool, 128),
|
||||||
retryServerCh: make(chan bool),
|
retryServerCh: make(chan bool),
|
||||||
signing: oqs.signing,
|
signing: oqs.signing,
|
||||||
}
|
}
|
||||||
|
@ -115,8 +122,18 @@ func (oqs *OutgoingQueues) SendEvent(
|
||||||
"destinations": destinations, "event": ev.EventID(),
|
"destinations": destinations, "event": ev.EventID(),
|
||||||
}).Info("Sending event")
|
}).Info("Sending event")
|
||||||
|
|
||||||
|
headeredJSON, err := json.Marshal(ev)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("json.Marshal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nid, err := oqs.db.StoreJSON(context.TODO(), string(headeredJSON))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, destination := range destinations {
|
for _, destination := range destinations {
|
||||||
oqs.getQueue(destination).sendEvent(ev)
|
oqs.getQueue(destination).sendEvent(nid)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -19,10 +19,15 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/types"
|
"github.com/matrix-org/dendrite/federationsender/types"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database interface {
|
type Database interface {
|
||||||
internal.PartitionStorer
|
internal.PartitionStorer
|
||||||
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
|
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
|
||||||
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
|
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
|
||||||
|
StoreJSON(ctx context.Context, js string) (int64, error)
|
||||||
|
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nids []int64) error
|
||||||
|
GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error)
|
||||||
|
CleanTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
const queueJSONSchema = `
|
||||||
|
-- The federationsender_queue_json table contains event contents that
|
||||||
|
-- we failed to send.
|
||||||
|
CREATE TABLE IF NOT EXISTS federationsender_queue_json (
|
||||||
|
-- The JSON NID. This allows the federationsender_queue_retry table to
|
||||||
|
-- cross-reference to find the JSON blob.
|
||||||
|
json_nid BIGSERIAL,
|
||||||
|
-- The JSON body. Text so that we preserve UTF-8.
|
||||||
|
json_body TEXT NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertJSONSQL = "" +
|
||||||
|
"INSERT INTO federationsender_queue_json (json_body)" +
|
||||||
|
" VALUES ($1)" +
|
||||||
|
" RETURNING json_nid"
|
||||||
|
|
||||||
|
const deleteJSONSQL = "" +
|
||||||
|
"DELETE FROM federationsender_queue_json WHERE json_nid = ANY($1)"
|
||||||
|
|
||||||
|
const selectJSONSQL = "" +
|
||||||
|
"SELECT json_nid, json_body FROM federationsender_queue_json" +
|
||||||
|
" WHERE json_nid = ANY($1)"
|
||||||
|
|
||||||
|
type queueJSONStatements struct {
|
||||||
|
insertJSONStmt *sql.Stmt
|
||||||
|
deleteJSONStmt *sql.Stmt
|
||||||
|
selectJSONStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(queueJSONSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.deleteJSONStmt, err = db.Prepare(deleteJSONSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectJSONStmt, err = db.Prepare(selectJSONSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) insertQueueJSON(
|
||||||
|
ctx context.Context, txn *sql.Tx, json string,
|
||||||
|
) (int64, error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
|
||||||
|
var lastid int64
|
||||||
|
if err := stmt.QueryRowContext(ctx, json).Scan(&lastid); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return lastid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) deleteQueueJSON(
|
||||||
|
ctx context.Context, txn *sql.Tx, nids []int64,
|
||||||
|
) error {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt)
|
||||||
|
_, err := stmt.ExecContext(ctx, pq.Int64Array(nids))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) selectQueueJSON(
|
||||||
|
ctx context.Context, txn *sql.Tx, jsonNIDs []int64,
|
||||||
|
) (map[int64][]byte, error) {
|
||||||
|
blobs := map[int64][]byte{}
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectJSONStmt)
|
||||||
|
rows, err := stmt.QueryContext(ctx, pq.Int64Array(jsonNIDs))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed")
|
||||||
|
for rows.Next() {
|
||||||
|
var nid int64
|
||||||
|
var blob []byte
|
||||||
|
if err = rows.Scan(&nid, &blob); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
blobs[nid] = blob
|
||||||
|
}
|
||||||
|
return blobs, err
|
||||||
|
}
|
|
@ -0,0 +1,169 @@
|
||||||
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
const queuePDUsSchema = `
|
||||||
|
CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
|
||||||
|
-- The transaction ID that was generated before persisting the event.
|
||||||
|
transaction_id TEXT NOT NULL,
|
||||||
|
-- The destination server that we will send the event to.
|
||||||
|
server_name TEXT NOT NULL,
|
||||||
|
-- The JSON NID from the federationsender_queue_pdus_json table.
|
||||||
|
json_nid BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx
|
||||||
|
ON federationsender_queue_pdus (json_nid, server_name);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertQueuePDUSQL = "" +
|
||||||
|
"INSERT INTO federationsender_queue_pdus (transaction_id, server_name, json_nid)" +
|
||||||
|
" VALUES ($1, $2, $3)"
|
||||||
|
|
||||||
|
const deleteQueueTransactionPDUsSQL = "" +
|
||||||
|
"DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND transaction_id = $2"
|
||||||
|
|
||||||
|
const selectQueueNextTransactionIDSQL = "" +
|
||||||
|
"SELECT transaction_id FROM federationsender_queue_pdus" +
|
||||||
|
" WHERE server_name = $1" +
|
||||||
|
" ORDER BY transaction_id ASC" +
|
||||||
|
" LIMIT 1"
|
||||||
|
|
||||||
|
const selectQueuePDUsByTransactionSQL = "" +
|
||||||
|
"SELECT json_nid FROM federationsender_queue_pdus" +
|
||||||
|
" WHERE server_name = $1 AND transaction_id = $2" +
|
||||||
|
" LIMIT $3"
|
||||||
|
|
||||||
|
const selectQueueReferenceJSONCountSQL = "" +
|
||||||
|
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
|
||||||
|
" WHERE json_nid = $1"
|
||||||
|
|
||||||
|
type queuePDUsStatements struct {
|
||||||
|
insertQueuePDUStmt *sql.Stmt
|
||||||
|
deleteQueueTransactionPDUsStmt *sql.Stmt
|
||||||
|
selectQueueNextTransactionIDStmt *sql.Stmt
|
||||||
|
selectQueuePDUsByTransactionStmt *sql.Stmt
|
||||||
|
selectQueueReferenceJSONCountStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(queuePDUsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) insertQueuePDU(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
nid int64,
|
||||||
|
) error {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
|
||||||
|
_, err := stmt.ExecContext(
|
||||||
|
ctx,
|
||||||
|
transactionID, // the transaction ID that we initially attempted
|
||||||
|
serverName, // destination server name
|
||||||
|
nid, // JSON blob NID
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) deleteQueueTransaction(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
) error {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt)
|
||||||
|
_, err := stmt.ExecContext(ctx, serverName, transactionID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) selectQueueNextTransactionID(
|
||||||
|
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
|
||||||
|
) (gomatrixserverlib.TransactionID, error) {
|
||||||
|
var transactionID gomatrixserverlib.TransactionID
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt)
|
||||||
|
err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
return transactionID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) selectQueueReferenceJSONCount(
|
||||||
|
ctx context.Context, txn *sql.Tx, jsonNID int64,
|
||||||
|
) (int64, error) {
|
||||||
|
var count int64
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt)
|
||||||
|
err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// It's acceptable for there to be no rows referencing a given
|
||||||
|
// JSON NID but it's not an error condition. Just return as if
|
||||||
|
// there's a zero count.
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) selectQueuePDUs(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
limit int,
|
||||||
|
) ([]int64, error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
|
||||||
|
rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
|
||||||
|
var result []int64
|
||||||
|
for rows.Next() {
|
||||||
|
var nid int64
|
||||||
|
if err = rows.Scan(&nid); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, nid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
|
@ -18,15 +18,20 @@ package postgres
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/types"
|
"github.com/matrix-org/dendrite/federationsender/types"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Database stores information needed by the federation sender
|
// Database stores information needed by the federation sender
|
||||||
type Database struct {
|
type Database struct {
|
||||||
joinedHostsStatements
|
joinedHostsStatements
|
||||||
roomStatements
|
roomStatements
|
||||||
|
queuePDUsStatements
|
||||||
|
queueJSONStatements
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
}
|
}
|
||||||
|
@ -55,6 +60,14 @@ func (d *Database) prepare() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = d.queuePDUsStatements.prepare(d.db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = d.queueJSONStatements.prepare(d.db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
|
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,3 +133,125 @@ func (d *Database) GetJoinedHosts(
|
||||||
) ([]types.JoinedHost, error) {
|
) ([]types.JoinedHost, error) {
|
||||||
return d.selectJoinedHosts(ctx, roomID)
|
return d.selectJoinedHosts(ctx, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StoreJSON adds a JSON blob into the queue JSON table and returns
|
||||||
|
// a NID. The NID will then be used when inserting the per-destination
|
||||||
|
// metadata entries.
|
||||||
|
func (d *Database) StoreJSON(
|
||||||
|
ctx context.Context, js string,
|
||||||
|
) (int64, error) {
|
||||||
|
nid, err := d.insertQueueJSON(ctx, nil, js)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("d.insertQueueJSON: %w", err)
|
||||||
|
}
|
||||||
|
return nid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssociatePDUWithDestination creates an association that the
|
||||||
|
// destination queues will use to determine which JSON blobs to send
|
||||||
|
// to which servers.
|
||||||
|
func (d *Database) AssociatePDUWithDestination(
|
||||||
|
ctx context.Context,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
nids []int64,
|
||||||
|
) error {
|
||||||
|
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
for _, nid := range nids {
|
||||||
|
if err := d.insertQueuePDU(
|
||||||
|
ctx, // context
|
||||||
|
txn, // SQL transaction
|
||||||
|
transactionID, // transaction ID
|
||||||
|
serverName, // destination server name
|
||||||
|
nid, // NID from the federationsender_queue_json table
|
||||||
|
); err != nil {
|
||||||
|
return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNextTransactionPDUs retrieves events from the database for
|
||||||
|
// the next pending transaction, up to the limit specified.
|
||||||
|
func (d *Database) GetNextTransactionPDUs(
|
||||||
|
ctx context.Context,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
limit int,
|
||||||
|
) (
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
events []*gomatrixserverlib.HeaderedEvent,
|
||||||
|
err error,
|
||||||
|
) {
|
||||||
|
err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
transactionID, err = d.selectQueueNextTransactionID(ctx, txn, serverName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueueNextTransactionID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if transactionID == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, limit)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobs, err := d.selectQueueJSON(ctx, txn, nids)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectJSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, blob := range blobs {
|
||||||
|
var event gomatrixserverlib.HeaderedEvent
|
||||||
|
if err := json.Unmarshal(blob, &event); err != nil {
|
||||||
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
|
}
|
||||||
|
events = append(events, &event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// CleanTransactionPDUs cleans up all associated events for a
|
||||||
|
// given transaction. This is done when the transaction was sent
|
||||||
|
// successfully.
|
||||||
|
func (d *Database) CleanTransactionPDUs(
|
||||||
|
ctx context.Context,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
) error {
|
||||||
|
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
|
||||||
|
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var count int64
|
||||||
|
var deleteNIDs []int64
|
||||||
|
for _, nid := range nids {
|
||||||
|
count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(deleteNIDs) > 0 {
|
||||||
|
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||||
|
return fmt.Errorf("d.deleteQueueJSON: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
// Copyright 2017-2018 New Vector Ltd
|
||||||
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
const queueJSONSchema = `
|
||||||
|
-- The queue_retry_json table contains event contents that
|
||||||
|
-- we failed to send.
|
||||||
|
CREATE TABLE IF NOT EXISTS federationsender_queue_json (
|
||||||
|
-- The JSON NID. This allows the federationsender_queue_retry table to
|
||||||
|
-- cross-reference to find the JSON blob.
|
||||||
|
json_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
-- The JSON body. Text so that we preserve UTF-8.
|
||||||
|
json_body TEXT NOT NULL
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertJSONSQL = "" +
|
||||||
|
"INSERT INTO federationsender_queue_json (json_body)" +
|
||||||
|
" VALUES ($1)"
|
||||||
|
|
||||||
|
const deleteJSONSQL = "" +
|
||||||
|
"DELETE FROM federationsender_queue_json WHERE json_nid IN ($1)"
|
||||||
|
|
||||||
|
const selectJSONSQL = "" +
|
||||||
|
"SELECT json_nid, json_body FROM federationsender_queue_json" +
|
||||||
|
" WHERE json_nid IN ($1)"
|
||||||
|
|
||||||
|
type queueJSONStatements struct {
|
||||||
|
insertJSONStmt *sql.Stmt
|
||||||
|
//deleteJSONStmt *sql.Stmt - prepared at runtime due to variadic
|
||||||
|
//selectJSONStmt *sql.Stmt - prepared at runtime due to variadic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(queueJSONSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) insertQueueJSON(
|
||||||
|
ctx context.Context, txn *sql.Tx, json string,
|
||||||
|
) (int64, error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
|
||||||
|
res, err := stmt.ExecContext(ctx, json)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("stmt.QueryContext: %w", err)
|
||||||
|
}
|
||||||
|
lastid, err := res.LastInsertId()
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("res.LastInsertId: %w", err)
|
||||||
|
}
|
||||||
|
return lastid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) deleteQueueJSON(
|
||||||
|
ctx context.Context, txn *sql.Tx, nids []int64,
|
||||||
|
) error {
|
||||||
|
deleteSQL := strings.Replace(deleteJSONSQL, "($1)", sqlutil.QueryVariadic(len(nids)), 1)
|
||||||
|
deleteStmt, err := txn.Prepare(deleteSQL)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("s.deleteQueueJSON s.db.Prepare: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
iNIDs := make([]interface{}, len(nids))
|
||||||
|
for k, v := range nids {
|
||||||
|
iNIDs[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt := sqlutil.TxStmt(txn, deleteStmt)
|
||||||
|
_, err = stmt.ExecContext(ctx, iNIDs...)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queueJSONStatements) selectQueueJSON(
|
||||||
|
ctx context.Context, txn *sql.Tx, jsonNIDs []int64,
|
||||||
|
) (map[int64][]byte, error) {
|
||||||
|
selectSQL := strings.Replace(selectJSONSQL, "($1)", sqlutil.QueryVariadic(len(jsonNIDs)), 1)
|
||||||
|
selectStmt, err := txn.Prepare(selectSQL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("s.selectQueueJSON s.db.Prepare: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
iNIDs := make([]interface{}, len(jsonNIDs))
|
||||||
|
for k, v := range jsonNIDs {
|
||||||
|
iNIDs[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
blobs := map[int64][]byte{}
|
||||||
|
stmt := sqlutil.TxStmt(txn, selectStmt)
|
||||||
|
rows, err := stmt.QueryContext(ctx, iNIDs...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("s.selectQueueJSON stmt.QueryContext: %w", err)
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed")
|
||||||
|
for rows.Next() {
|
||||||
|
var nid int64
|
||||||
|
var blob []byte
|
||||||
|
if err = rows.Scan(&nid, &blob); err != nil {
|
||||||
|
return nil, fmt.Errorf("s.selectQueueJSON rows.Scan: %w", err)
|
||||||
|
}
|
||||||
|
blobs[nid] = blob
|
||||||
|
}
|
||||||
|
return blobs, err
|
||||||
|
}
|
|
@ -0,0 +1,167 @@
|
||||||
|
// Copyright 2017-2018 New Vector Ltd
|
||||||
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
const queuePDUsSchema = `
|
||||||
|
CREATE TABLE IF NOT EXISTS federationsender_queue_pdus (
|
||||||
|
-- The transaction ID that was generated before persisting the event.
|
||||||
|
transaction_id TEXT NOT NULL,
|
||||||
|
-- The domain part of the user ID the m.room.member event is for.
|
||||||
|
server_name TEXT NOT NULL,
|
||||||
|
-- The JSON NID from the federationsender_queue_pdus_json table.
|
||||||
|
json_nid BIGINT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx
|
||||||
|
ON federationsender_queue_pdus (json_nid, server_name);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertQueuePDUSQL = "" +
|
||||||
|
"INSERT INTO federationsender_queue_pdus (transaction_id, server_name, json_nid)" +
|
||||||
|
" VALUES ($1, $2, $3)"
|
||||||
|
|
||||||
|
const deleteQueueTransactionPDUsSQL = "" +
|
||||||
|
"DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND transaction_id = $2"
|
||||||
|
|
||||||
|
const selectQueueNextTransactionIDSQL = "" +
|
||||||
|
"SELECT transaction_id FROM federationsender_queue_pdus" +
|
||||||
|
" WHERE server_name = $1" +
|
||||||
|
" ORDER BY transaction_id ASC" +
|
||||||
|
" LIMIT 1"
|
||||||
|
|
||||||
|
const selectQueuePDUsByTransactionSQL = "" +
|
||||||
|
"SELECT json_nid FROM federationsender_queue_pdus" +
|
||||||
|
" WHERE server_name = $1 AND transaction_id = $2" +
|
||||||
|
" LIMIT $3"
|
||||||
|
|
||||||
|
const selectQueueReferenceJSONCountSQL = "" +
|
||||||
|
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
|
||||||
|
" WHERE json_nid = $1"
|
||||||
|
|
||||||
|
type queuePDUsStatements struct {
|
||||||
|
insertQueuePDUStmt *sql.Stmt
|
||||||
|
deleteQueueTransactionPDUsStmt *sql.Stmt
|
||||||
|
selectQueueNextTransactionIDStmt *sql.Stmt
|
||||||
|
selectQueuePDUsByTransactionStmt *sql.Stmt
|
||||||
|
selectQueueReferenceJSONCountStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(queuePDUsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) insertQueuePDU(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
nid int64,
|
||||||
|
) error {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
|
||||||
|
_, err := stmt.ExecContext(
|
||||||
|
ctx,
|
||||||
|
transactionID, // the transaction ID that we initially attempted
|
||||||
|
serverName, // destination server name
|
||||||
|
nid, // JSON blob NID
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) deleteQueueTransaction(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
) error {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt)
|
||||||
|
_, err := stmt.ExecContext(ctx, serverName, transactionID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) selectQueueNextTransactionID(
|
||||||
|
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
|
||||||
|
) (gomatrixserverlib.TransactionID, error) {
|
||||||
|
var transactionID gomatrixserverlib.TransactionID
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt)
|
||||||
|
err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
return transactionID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) selectQueueReferenceJSONCount(
|
||||||
|
ctx context.Context, txn *sql.Tx, jsonNID int64,
|
||||||
|
) (int64, error) {
|
||||||
|
var count int64
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt)
|
||||||
|
err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return -1, nil
|
||||||
|
}
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *queuePDUsStatements) selectQueuePDUs(
|
||||||
|
ctx context.Context, txn *sql.Tx,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
limit int,
|
||||||
|
) ([]int64, error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt)
|
||||||
|
rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed")
|
||||||
|
var result []int64
|
||||||
|
for rows.Next() {
|
||||||
|
var nid int64
|
||||||
|
if err = rows.Scan(&nid); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, nid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, rows.Err()
|
||||||
|
}
|
|
@ -18,17 +18,22 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationsender/types"
|
"github.com/matrix-org/dendrite/federationsender/types"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Database stores information needed by the federation sender
|
// Database stores information needed by the federation sender
|
||||||
type Database struct {
|
type Database struct {
|
||||||
joinedHostsStatements
|
joinedHostsStatements
|
||||||
roomStatements
|
roomStatements
|
||||||
|
queuePDUsStatements
|
||||||
|
queueJSONStatements
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
}
|
}
|
||||||
|
@ -61,6 +66,14 @@ func (d *Database) prepare() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = d.queuePDUsStatements.prepare(d.db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = d.queueJSONStatements.prepare(d.db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
|
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,3 +139,125 @@ func (d *Database) GetJoinedHosts(
|
||||||
) ([]types.JoinedHost, error) {
|
) ([]types.JoinedHost, error) {
|
||||||
return d.selectJoinedHosts(ctx, roomID)
|
return d.selectJoinedHosts(ctx, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StoreJSON adds a JSON blob into the queue JSON table and returns
|
||||||
|
// a NID. The NID will then be used when inserting the per-destination
|
||||||
|
// metadata entries.
|
||||||
|
func (d *Database) StoreJSON(
|
||||||
|
ctx context.Context, js string,
|
||||||
|
) (int64, error) {
|
||||||
|
nid, err := d.insertQueueJSON(ctx, nil, js)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("d.insertQueueJSON: %w", err)
|
||||||
|
}
|
||||||
|
return nid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssociatePDUWithDestination creates an association that the
|
||||||
|
// destination queues will use to determine which JSON blobs to send
|
||||||
|
// to which servers.
|
||||||
|
func (d *Database) AssociatePDUWithDestination(
|
||||||
|
ctx context.Context,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
nids []int64,
|
||||||
|
) error {
|
||||||
|
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
for _, nid := range nids {
|
||||||
|
if err := d.insertQueuePDU(
|
||||||
|
ctx, // context
|
||||||
|
txn, // SQL transaction
|
||||||
|
transactionID, // transaction ID
|
||||||
|
serverName, // destination server name
|
||||||
|
nid, // NID from the federationsender_queue_json table
|
||||||
|
); err != nil {
|
||||||
|
return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNextTransactionPDUs retrieves events from the database for
|
||||||
|
// the next pending transaction, up to the limit specified.
|
||||||
|
func (d *Database) GetNextTransactionPDUs(
|
||||||
|
ctx context.Context,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
limit int,
|
||||||
|
) (
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
events []*gomatrixserverlib.HeaderedEvent,
|
||||||
|
err error,
|
||||||
|
) {
|
||||||
|
err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
transactionID, err = d.selectQueueNextTransactionID(ctx, txn, serverName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueueNextTransactionID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if transactionID == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, limit)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobs, err := d.selectQueueJSON(ctx, txn, nids)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectJSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, blob := range blobs {
|
||||||
|
var event gomatrixserverlib.HeaderedEvent
|
||||||
|
if err := json.Unmarshal(blob, &event); err != nil {
|
||||||
|
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||||
|
}
|
||||||
|
events = append(events, &event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// CleanTransactionPDUs cleans up all associated events for a
|
||||||
|
// given transaction. This is done when the transaction was sent
|
||||||
|
// successfully.
|
||||||
|
func (d *Database) CleanTransactionPDUs(
|
||||||
|
ctx context.Context,
|
||||||
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
|
) error {
|
||||||
|
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
|
||||||
|
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var count int64
|
||||||
|
var deleteNIDs []int64
|
||||||
|
for _, nid := range nids {
|
||||||
|
count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err)
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(deleteNIDs) > 0 {
|
||||||
|
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||||
|
return fmt.Errorf("d.deleteQueueJSON: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue