From 42dd96242574866378bb95d92bc0c7fdf3dbabf6 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 1 Jul 2020 11:46:38 +0100 Subject: [PATCH] Persistent federation sender queues (PDUs) (#1173) * Initial work on persistent queues * Update index for event ID and server name * Put things into database (postgres for now) * Duplicate postgres code into sqlite for now just to stop build errors, will fix SQLite soon * Fix table name * Fix index * Fix table name * Use RETURNING because LastInsertID is not supported by postgres * Use functions * Marshal headered event * Don't error on now rows * Don't block if there are PDUs waiting * Try to tidy up JSON * Debug logging * Fix query, use transactions in postgres * Clean up * Rehydrate more opportunistically * Fix SQLite * remove unused types * Review comments * Shuffle things around a bit * Clean up transaction properly * Don't send empty transactions * Reduce unnecessary retries * Count PDUs to make more resilient * Don't stop when there is work to be done * Try to limit wakeups * well this is tedious * Fix race in incomplete transactions * Thread safety on transaction ID/count --- cmd/dendrite-demo-libp2p/main.go | 3 +- cmd/dendrite-demo-yggdrasil/main.go | 3 +- federationsender/federationsender.go | 3 +- federationsender/queue/destinationqueue.go | 267 +++++++++++------- federationsender/queue/queue.go | 21 +- federationsender/storage/interface.go | 5 + .../storage/postgres/queue_json_table.go | 111 ++++++++ .../storage/postgres/queue_pdus_table.go | 169 +++++++++++ federationsender/storage/postgres/storage.go | 135 +++++++++ .../storage/sqlite3/queue_json_table.go | 132 +++++++++ .../storage/sqlite3/queue_pdus_table.go | 167 +++++++++++ federationsender/storage/sqlite3/storage.go | 135 +++++++++ 12 files changed, 1049 insertions(+), 102 deletions(-) create mode 100644 federationsender/storage/postgres/queue_json_table.go create mode 100644 federationsender/storage/postgres/queue_pdus_table.go create mode 100644 federationsender/storage/sqlite3/queue_json_table.go create mode 100644 federationsender/storage/sqlite3/queue_pdus_table.go diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index b7e86b77..4bb7a96c 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -130,8 +130,9 @@ func main() { cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName)) cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName)) + cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicrooms.db", *instanceName)) cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) + cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName)) if err = cfg.Derive(); err != nil { panic(err) } diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 5de67402..cef34c7e 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -74,7 +74,8 @@ func main() { cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName)) cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName)) + cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicrooms.db", *instanceName)) + cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName)) cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) if err = cfg.Derive(); err != nil { panic(err) diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 10ac51c8..acf52414 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -50,7 +50,8 @@ func NewInternalAPI( statistics := &types.Statistics{} queues := queue.NewOutgoingQueues( - base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{ + federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, + &queue.SigningInfo{ KeyID: base.Cfg.Matrix.KeyID, PrivateKey: base.Cfg.Matrix.PrivateKey, ServerName: base.Cfg.Matrix.ServerName, diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 4449f9e6..a736b385 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -18,8 +18,10 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" + "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" @@ -29,11 +31,14 @@ import ( "go.uber.org/atomic" ) +const maxPDUsPerTransaction = 50 + // destinationQueue is a queue of events for a single destination. // It is responsible for sending the events to the destination and // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { + db storage.Database signing *SigningInfo rsAPI api.RoomserverInternalAPI client *gomatrixserverlib.FederationClient // federation client @@ -42,13 +47,15 @@ type destinationQueue struct { running atomic.Bool // is the queue worker running? backingOff atomic.Bool // true if we're backing off statistics *types.ServerStatistics // statistics about this remote server - incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send - incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send - lastTransactionIDs []gomatrixserverlib.TransactionID // last transaction ID - pendingPDUs []*gomatrixserverlib.HeaderedEvent // owned by backgroundSend + incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send + transactionIDMutex sync.Mutex // protects transactionID + transactionID gomatrixserverlib.TransactionID // last transaction ID + transactionCount atomic.Int32 // how many events in this transaction so far + pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend + wakeServerCh chan bool // interrupts idle wait retryServerCh chan bool // interrupts backoff } @@ -79,15 +86,47 @@ func (oq *destinationQueue) retry() { // Send event adds the event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. -func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { +func (oq *destinationQueue) sendEvent(nid int64) { if oq.statistics.Blacklisted() { // If the destination is blacklisted then drop the event. return } + // Create a transaction ID. We'll either do this if we don't have + // one made up yet, or if we've exceeded the number of maximum + // events allowed in a single tranaction. We'll reset the counter + // when we do. + oq.transactionIDMutex.Lock() + if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction { + now := gomatrixserverlib.AsTimestamp(time.Now()) + oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount())) + oq.transactionCount.Store(0) + } + oq.transactionIDMutex.Unlock() + // Create a database entry that associates the given PDU NID with + // this destination queue. We'll then be able to retrieve the PDU + // later. + if err := oq.db.AssociatePDUWithDestination( + context.TODO(), + oq.transactionID, // the current transaction ID + oq.destination, // the destination server name + []int64{nid}, // NID from federationsender_queue_json table + ); err != nil { + log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination) + return + } + // We've successfully added a PDU to the transaction so increase + // the counter. + oq.transactionCount.Add(1) + // If the queue isn't running at this point then start it. if !oq.running.Load() { go oq.backgroundSend() } - oq.incomingPDUs <- ev + // Signal that we've sent a new PDU. This will cause the queue to + // wake up if it's asleep. The return to the Add function will only + // be 1 if the previous value was 0, e.g. nothing was waiting before. + if oq.pendingPDUs.Add(1) == 1 { + oq.wakeServerCh <- true + } } // sendEDU adds the EDU event to the pending queue for the destination. @@ -129,56 +168,48 @@ func (oq *destinationQueue) backgroundSend() { defer oq.running.Store(false) for { - // Wait either for incoming events, or until we hit an - // idle timeout. - select { - case pdu := <-oq.incomingPDUs: - // Ordering of PDUs is important so we add them to the end - // of the queue and they will all be added to transactions - // in order. - oq.pendingPDUs = append(oq.pendingPDUs, pdu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingPDUs) > 0 { - oq.pendingPDUs = append(oq.pendingPDUs, <-oq.incomingPDUs) + // If we have nothing to do then wait either for incoming events, or + // until we hit an idle timeout. + if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 { + select { + case <-oq.wakeServerCh: + // We were woken up because there are new PDUs waiting in the + // database. + case edu := <-oq.incomingEDUs: + // EDUs are handled in-memory for now. We will try to keep + // the ordering intact. + // TODO: Certain EDU types need persistence, e.g. send-to-device + oq.pendingEDUs = append(oq.pendingEDUs, edu) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingEDUs) > 0 { + oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) + } + case invite := <-oq.incomingInvites: + // There's no strict ordering requirement for invites like + // there is for transactions, so we put the invite onto the + // front of the queue. This means that if an invite that is + // stuck failing already, that it won't block our new invite + // from being sent. + oq.pendingInvites = append( + []*gomatrixserverlib.InviteV2Request{invite}, + oq.pendingInvites..., + ) + // If there are any more things waiting in the channel queue + // then read them. This is safe because we guarantee only + // having one goroutine per destination queue, so the channel + // isn't being consumed anywhere else. + for len(oq.incomingInvites) > 0 { + oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) + } + case <-time.After(time.Second * 30): + // The worker is idle so stop the goroutine. It'll get + // restarted automatically the next time we have an event to + // send. + return } - case edu := <-oq.incomingEDUs: - // Likewise for EDUs, although we should probably not try - // too hard with some EDUs (like typing notifications) after - // a certain amount of time has passed. - // TODO: think about EDU expiry some more - oq.pendingEDUs = append(oq.pendingEDUs, edu) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingEDUs) > 0 { - oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs) - } - case invite := <-oq.incomingInvites: - // There's no strict ordering requirement for invites like - // there is for transactions, so we put the invite onto the - // front of the queue. This means that if an invite that is - // stuck failing already, that it won't block our new invite - // from being sent. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{invite}, - oq.pendingInvites..., - ) - // If there are any more things waiting in the channel queue - // then read them. This is safe because we guarantee only - // having one goroutine per destination queue, so the channel - // isn't being consumed anywhere else. - for len(oq.incomingInvites) > 0 { - oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) - } - case <-time.After(time.Second * 30): - // The worker is idle so stop the goroutine. It'll - // get restarted automatically the next time we - // get an event. - return } // If we are backing off this server then wait for the @@ -193,47 +224,31 @@ func (oq *destinationQueue) backgroundSend() { oq.backingOff.Store(false) } - // How many things do we have waiting? - numPDUs := len(oq.pendingPDUs) - numEDUs := len(oq.pendingEDUs) - numInvites := len(oq.pendingInvites) - // If we have pending PDUs or EDUs then construct a transaction. - if numPDUs > 0 || numEDUs > 0 { + if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 { // Try sending the next transaction and see what happens. - transaction, terr := oq.nextTransaction(oq.pendingPDUs, oq.pendingEDUs, oq.statistics.SuccessCount()) + transaction, terr := oq.nextTransaction(oq.pendingEDUs) if terr != nil { // We failed to send the transaction. if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because - // the backoff has exceeded a maximum allowable value. + // It's been suggested that we should give up because the backoff + // has exceeded a maximum allowable value. Clean up the in-memory + // buffers at this point. The PDU clean-up is already on a defer. + oq.cleanPendingEDUs() + oq.cleanPendingInvites() return } } else if transaction { // If we successfully sent the transaction then clear out - // the pending events and EDUs. + // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() - // Reallocate so that the underlying arrays can be GC'd, as - // opposed to growing forever. - for i := 0; i < numPDUs; i++ { - oq.pendingPDUs[i] = nil - } - for i := 0; i < numEDUs; i++ { - oq.pendingEDUs[i] = nil - } - oq.pendingPDUs = append( - []*gomatrixserverlib.HeaderedEvent{}, - oq.pendingPDUs[numPDUs:]..., - ) - oq.pendingEDUs = append( - []*gomatrixserverlib.EDU{}, - oq.pendingEDUs[numEDUs:]..., - ) + // Clean up the in-memory buffers. + oq.cleanPendingEDUs() } } // Try sending the next invite and see what happens. - if numInvites > 0 { + if len(oq.pendingInvites) > 0 { sent, ierr := oq.nextInvites(oq.pendingInvites) if ierr != nil { // We failed to send the transaction so increase the @@ -249,59 +264,117 @@ func (oq *destinationQueue) backgroundSend() { oq.statistics.Success() // Reallocate so that the underlying array can be GC'd, as // opposed to growing forever. - oq.pendingInvites = append( - []*gomatrixserverlib.InviteV2Request{}, - oq.pendingInvites[sent:]..., - ) + oq.cleanPendingInvites() } } } } +// cleanPendingEDUs cleans out the pending EDU buffer, removing +// all references so that the underlying objects can be GC'd. +func (oq *destinationQueue) cleanPendingEDUs() { + for i := 0; i < len(oq.pendingEDUs); i++ { + oq.pendingEDUs[i] = nil + } + oq.pendingEDUs = []*gomatrixserverlib.EDU{} +} + +// cleanPendingInvites cleans out the pending invite buffer, +// removing all references so that the underlying objects can +// be GC'd. +func (oq *destinationQueue) cleanPendingInvites() { + for i := 0; i < len(oq.pendingInvites); i++ { + oq.pendingInvites[i] = nil + } + oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{} +} + // nextTransaction creates a new transaction from the pending event // queue and sends it. Returns true if a transaction was sent or // false otherwise. func (oq *destinationQueue) nextTransaction( - pendingPDUs []*gomatrixserverlib.HeaderedEvent, pendingEDUs []*gomatrixserverlib.EDU, - sentCounter uint32, ) (bool, error) { + // Before we do anything, we need to roll over the transaction + // ID that is being used to coalesce events into the next TX. + // Otherwise it's possible that we'll pick up an incomplete + // transaction and end up nuking the rest of the events at the + // cleanup stage. + oq.transactionIDMutex.Lock() + oq.transactionID = "" + oq.transactionIDMutex.Unlock() + oq.transactionCount.Store(0) + + // Create the transaction. t := gomatrixserverlib.Transaction{ PDUs: []json.RawMessage{}, EDUs: []gomatrixserverlib.EDU{}, } - now := gomatrixserverlib.AsTimestamp(time.Now()) - t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, sentCounter)) t.Origin = oq.origin t.Destination = oq.destination - t.OriginServerTS = now - t.PreviousIDs = oq.lastTransactionIDs - if t.PreviousIDs == nil { - t.PreviousIDs = []gomatrixserverlib.TransactionID{} + t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now()) + + // Ask the database for any pending PDUs from the next transaction. + // maxPDUsPerTransaction is an upper limit but we probably won't + // actually retrieve that many events. + txid, pdus, err := oq.db.GetNextTransactionPDUs( + context.TODO(), // context + oq.destination, // server name + maxPDUsPerTransaction, // max events to retrieve + ) + if err != nil { + log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination) + return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err) } - oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + // If we didn't get anything from the database and there are no + // pending EDUs then there's nothing to do - stop here. + if len(pdus) == 0 && len(pendingEDUs) == 0 { + return false, nil + } - for _, pdu := range pendingPDUs { + // Pick out the transaction ID from the database. If we didn't + // get a transaction ID (i.e. because there are no PDUs but only + // EDUs) then generate a transaction ID. + t.TransactionID = txid + if t.TransactionID == "" { + now := gomatrixserverlib.AsTimestamp(time.Now()) + t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount())) + } + + // Go through PDUs that we retrieved from the database, if any, + // and add them into the transaction. + for _, pdu := range pdus { // Append the JSON of the event, since this is a json.RawMessage type in the // gomatrixserverlib.Transaction struct t.PDUs = append(t.PDUs, (*pdu).JSON()) } + // Do the same for pending EDUS in the queue. for _, edu := range pendingEDUs { t.EDUs = append(t.EDUs, *edu) } logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) + // Try to send the transaction to the destination server. // TODO: we should check for 500-ish fails vs 400-ish here, // since we shouldn't queue things indefinitely in response // to a 400-ish error - _, err := oq.client.SendTransaction(context.TODO(), t) + _, err = oq.client.SendTransaction(context.TODO(), t) switch e := err.(type) { case nil: // No error was returned so the transaction looks to have // been successfully sent. + oq.pendingPDUs.Sub(int32(len(t.PDUs))) + // Clean up the transaction in the database. + if err = oq.db.CleanTransactionPDUs( + context.TODO(), + t.Destination, + t.TransactionID, + ); err != nil { + log.WithError(err).Errorf("failed to clean transaction %q for server %q", t.TransactionID, t.Destination) + } return true, nil case gomatrix.HTTPError: // We received a HTTP error back. In this instance we only diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 24034355..492d5f55 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -15,10 +15,13 @@ package queue import ( + "context" "crypto/ed25519" + "encoding/json" "fmt" "sync" + "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -29,6 +32,7 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { + db storage.Database rsAPI api.RoomserverInternalAPI origin gomatrixserverlib.ServerName client *gomatrixserverlib.FederationClient @@ -40,6 +44,7 @@ type OutgoingQueues struct { // NewOutgoingQueues makes a new OutgoingQueues func NewOutgoingQueues( + db storage.Database, origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient, rsAPI api.RoomserverInternalAPI, @@ -47,6 +52,7 @@ func NewOutgoingQueues( signing *SigningInfo, ) *OutgoingQueues { return &OutgoingQueues{ + db: db, rsAPI: rsAPI, origin: origin, client: client, @@ -76,14 +82,15 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + db: oqs.db, rsAPI: oqs.rsAPI, origin: oqs.origin, destination: destination, client: oqs.client, statistics: oqs.statistics.ForServer(destination), - incomingPDUs: make(chan *gomatrixserverlib.HeaderedEvent, 128), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), + wakeServerCh: make(chan bool, 128), retryServerCh: make(chan bool), signing: oqs.signing, } @@ -115,8 +122,18 @@ func (oqs *OutgoingQueues) SendEvent( "destinations": destinations, "event": ev.EventID(), }).Info("Sending event") + headeredJSON, err := json.Marshal(ev) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + nid, err := oqs.db.StoreJSON(context.TODO(), string(headeredJSON)) + if err != nil { + return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err) + } + for _, destination := range destinations { - oqs.getQueue(destination).sendEvent(ev) + oqs.getQueue(destination).sendEvent(nid) } return nil diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index be195382..f4df93fa 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -19,10 +19,15 @@ import ( "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/gomatrixserverlib" ) type Database interface { internal.PartitionStorer UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) + StoreJSON(ctx context.Context, js string) (int64, error) + AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nids []int64) error + GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error) + CleanTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID) error } diff --git a/federationsender/storage/postgres/queue_json_table.go b/federationsender/storage/postgres/queue_json_table.go new file mode 100644 index 00000000..eac2ea98 --- /dev/null +++ b/federationsender/storage/postgres/queue_json_table.go @@ -0,0 +1,111 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +const queueJSONSchema = ` +-- The federationsender_queue_json table contains event contents that +-- we failed to send. +CREATE TABLE IF NOT EXISTS federationsender_queue_json ( + -- The JSON NID. This allows the federationsender_queue_retry table to + -- cross-reference to find the JSON blob. + json_nid BIGSERIAL, + -- The JSON body. Text so that we preserve UTF-8. + json_body TEXT NOT NULL +); +` + +const insertJSONSQL = "" + + "INSERT INTO federationsender_queue_json (json_body)" + + " VALUES ($1)" + + " RETURNING json_nid" + +const deleteJSONSQL = "" + + "DELETE FROM federationsender_queue_json WHERE json_nid = ANY($1)" + +const selectJSONSQL = "" + + "SELECT json_nid, json_body FROM federationsender_queue_json" + + " WHERE json_nid = ANY($1)" + +type queueJSONStatements struct { + insertJSONStmt *sql.Stmt + deleteJSONStmt *sql.Stmt + selectJSONStmt *sql.Stmt +} + +func (s *queueJSONStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queueJSONSchema) + if err != nil { + return + } + if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil { + return + } + if s.deleteJSONStmt, err = db.Prepare(deleteJSONSQL); err != nil { + return + } + if s.selectJSONStmt, err = db.Prepare(selectJSONSQL); err != nil { + return + } + return +} + +func (s *queueJSONStatements) insertQueueJSON( + ctx context.Context, txn *sql.Tx, json string, +) (int64, error) { + stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) + var lastid int64 + if err := stmt.QueryRowContext(ctx, json).Scan(&lastid); err != nil { + return 0, err + } + return lastid, nil +} + +func (s *queueJSONStatements) deleteQueueJSON( + ctx context.Context, txn *sql.Tx, nids []int64, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteJSONStmt) + _, err := stmt.ExecContext(ctx, pq.Int64Array(nids)) + return err +} + +func (s *queueJSONStatements) selectQueueJSON( + ctx context.Context, txn *sql.Tx, jsonNIDs []int64, +) (map[int64][]byte, error) { + blobs := map[int64][]byte{} + stmt := sqlutil.TxStmt(txn, s.selectJSONStmt) + rows, err := stmt.QueryContext(ctx, pq.Int64Array(jsonNIDs)) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed") + for rows.Next() { + var nid int64 + var blob []byte + if err = rows.Scan(&nid, &blob); err != nil { + return nil, err + } + blobs[nid] = blob + } + return blobs, err +} diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationsender/storage/postgres/queue_pdus_table.go new file mode 100644 index 00000000..ef7a9f41 --- /dev/null +++ b/federationsender/storage/postgres/queue_pdus_table.go @@ -0,0 +1,169 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const queuePDUsSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_queue_pdus ( + -- The transaction ID that was generated before persisting the event. + transaction_id TEXT NOT NULL, + -- The destination server that we will send the event to. + server_name TEXT NOT NULL, + -- The JSON NID from the federationsender_queue_pdus_json table. + json_nid BIGINT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx + ON federationsender_queue_pdus (json_nid, server_name); +` + +const insertQueuePDUSQL = "" + + "INSERT INTO federationsender_queue_pdus (transaction_id, server_name, json_nid)" + + " VALUES ($1, $2, $3)" + +const deleteQueueTransactionPDUsSQL = "" + + "DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND transaction_id = $2" + +const selectQueueNextTransactionIDSQL = "" + + "SELECT transaction_id FROM federationsender_queue_pdus" + + " WHERE server_name = $1" + + " ORDER BY transaction_id ASC" + + " LIMIT 1" + +const selectQueuePDUsByTransactionSQL = "" + + "SELECT json_nid FROM federationsender_queue_pdus" + + " WHERE server_name = $1 AND transaction_id = $2" + + " LIMIT $3" + +const selectQueueReferenceJSONCountSQL = "" + + "SELECT COUNT(*) FROM federationsender_queue_pdus" + + " WHERE json_nid = $1" + +type queuePDUsStatements struct { + insertQueuePDUStmt *sql.Stmt + deleteQueueTransactionPDUsStmt *sql.Stmt + selectQueueNextTransactionIDStmt *sql.Stmt + selectQueuePDUsByTransactionStmt *sql.Stmt + selectQueueReferenceJSONCountStmt *sql.Stmt +} + +func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queuePDUsSchema) + if err != nil { + return + } + if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil { + return + } + if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil { + return + } + if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil { + return + } + if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil { + return + } + if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil { + return + } + return +} + +func (s *queuePDUsStatements) insertQueuePDU( + ctx context.Context, + txn *sql.Tx, + transactionID gomatrixserverlib.TransactionID, + serverName gomatrixserverlib.ServerName, + nid int64, +) error { + stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) + _, err := stmt.ExecContext( + ctx, + transactionID, // the transaction ID that we initially attempted + serverName, // destination server name + nid, // JSON blob NID + ) + return err +} + +func (s *queuePDUsStatements) deleteQueueTransaction( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt) + _, err := stmt.ExecContext(ctx, serverName, transactionID) + return err +} + +func (s *queuePDUsStatements) selectQueueNextTransactionID( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, +) (gomatrixserverlib.TransactionID, error) { + var transactionID gomatrixserverlib.TransactionID + stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt) + err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) + if err == sql.ErrNoRows { + return "", nil + } + return transactionID, err +} + +func (s *queuePDUsStatements) selectQueueReferenceJSONCount( + ctx context.Context, txn *sql.Tx, jsonNID int64, +) (int64, error) { + var count int64 + stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt) + err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) + if err == sql.ErrNoRows { + // It's acceptable for there to be no rows referencing a given + // JSON NID but it's not an error condition. Just return as if + // there's a zero count. + return 0, nil + } + return count, err +} + +func (s *queuePDUsStatements) selectQueuePDUs( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, + limit int, +) ([]int64, error) { + stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) + rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") + var result []int64 + for rows.Next() { + var nid int64 + if err = rows.Scan(&nid); err != nil { + return nil, err + } + result = append(result, nid) + } + + return result, rows.Err() +} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index 8fd4c11a..18d1532a 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -18,15 +18,20 @@ package postgres import ( "context" "database/sql" + "encoding/json" + "fmt" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" ) // Database stores information needed by the federation sender type Database struct { joinedHostsStatements roomStatements + queuePDUsStatements + queueJSONStatements sqlutil.PartitionOffsetStatements db *sql.DB } @@ -55,6 +60,14 @@ func (d *Database) prepare() error { return err } + if err = d.queuePDUsStatements.prepare(d.db); err != nil { + return err + } + + if err = d.queueJSONStatements.prepare(d.db); err != nil { + return err + } + return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") } @@ -120,3 +133,125 @@ func (d *Database) GetJoinedHosts( ) ([]types.JoinedHost, error) { return d.selectJoinedHosts(ctx, roomID) } + +// StoreJSON adds a JSON blob into the queue JSON table and returns +// a NID. The NID will then be used when inserting the per-destination +// metadata entries. +func (d *Database) StoreJSON( + ctx context.Context, js string, +) (int64, error) { + nid, err := d.insertQueueJSON(ctx, nil, js) + if err != nil { + return 0, fmt.Errorf("d.insertQueueJSON: %w", err) + } + return nid, nil +} + +// AssociatePDUWithDestination creates an association that the +// destination queues will use to determine which JSON blobs to send +// to which servers. +func (d *Database) AssociatePDUWithDestination( + ctx context.Context, + transactionID gomatrixserverlib.TransactionID, + serverName gomatrixserverlib.ServerName, + nids []int64, +) error { + return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { + for _, nid := range nids { + if err := d.insertQueuePDU( + ctx, // context + txn, // SQL transaction + transactionID, // transaction ID + serverName, // destination server name + nid, // NID from the federationsender_queue_json table + ); err != nil { + return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err) + } + } + return nil + }) +} + +// GetNextTransactionPDUs retrieves events from the database for +// the next pending transaction, up to the limit specified. +func (d *Database) GetNextTransactionPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, + limit int, +) ( + transactionID gomatrixserverlib.TransactionID, + events []*gomatrixserverlib.HeaderedEvent, + err error, +) { + err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { + transactionID, err = d.selectQueueNextTransactionID(ctx, txn, serverName) + if err != nil { + return fmt.Errorf("d.selectQueueNextTransactionID: %w", err) + } + + if transactionID == "" { + return nil + } + + nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, limit) + if err != nil { + return fmt.Errorf("d.selectQueuePDUs: %w", err) + } + + blobs, err := d.selectQueueJSON(ctx, txn, nids) + if err != nil { + return fmt.Errorf("d.selectJSON: %w", err) + } + + for _, blob := range blobs { + var event gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(blob, &event); err != nil { + return fmt.Errorf("json.Unmarshal: %w", err) + } + events = append(events, &event) + } + + return nil + }) + return +} + +// CleanTransactionPDUs cleans up all associated events for a +// given transaction. This is done when the transaction was sent +// successfully. +func (d *Database) CleanTransactionPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, +) error { + return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { + nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50) + if err != nil { + return fmt.Errorf("d.selectQueuePDUs: %w", err) + } + + if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil { + return fmt.Errorf("d.deleteQueueTransaction: %w", err) + } + + var count int64 + var deleteNIDs []int64 + for _, nid := range nids { + count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid) + if err != nil { + return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) + } + if count == 0 { + deleteNIDs = append(deleteNIDs, nid) + } + } + + if len(deleteNIDs) > 0 { + if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil { + return fmt.Errorf("d.deleteQueueJSON: %w", err) + } + } + + return nil + }) +} diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationsender/storage/sqlite3/queue_json_table.go new file mode 100644 index 00000000..01b7160d --- /dev/null +++ b/federationsender/storage/sqlite3/queue_json_table.go @@ -0,0 +1,132 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +const queueJSONSchema = ` +-- The queue_retry_json table contains event contents that +-- we failed to send. +CREATE TABLE IF NOT EXISTS federationsender_queue_json ( + -- The JSON NID. This allows the federationsender_queue_retry table to + -- cross-reference to find the JSON blob. + json_nid INTEGER PRIMARY KEY AUTOINCREMENT, + -- The JSON body. Text so that we preserve UTF-8. + json_body TEXT NOT NULL +); +` + +const insertJSONSQL = "" + + "INSERT INTO federationsender_queue_json (json_body)" + + " VALUES ($1)" + +const deleteJSONSQL = "" + + "DELETE FROM federationsender_queue_json WHERE json_nid IN ($1)" + +const selectJSONSQL = "" + + "SELECT json_nid, json_body FROM federationsender_queue_json" + + " WHERE json_nid IN ($1)" + +type queueJSONStatements struct { + insertJSONStmt *sql.Stmt + //deleteJSONStmt *sql.Stmt - prepared at runtime due to variadic + //selectJSONStmt *sql.Stmt - prepared at runtime due to variadic +} + +func (s *queueJSONStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queueJSONSchema) + if err != nil { + return + } + if s.insertJSONStmt, err = db.Prepare(insertJSONSQL); err != nil { + return + } + return +} + +func (s *queueJSONStatements) insertQueueJSON( + ctx context.Context, txn *sql.Tx, json string, +) (int64, error) { + stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) + res, err := stmt.ExecContext(ctx, json) + if err != nil { + return 0, fmt.Errorf("stmt.QueryContext: %w", err) + } + lastid, err := res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("res.LastInsertId: %w", err) + } + return lastid, nil +} + +func (s *queueJSONStatements) deleteQueueJSON( + ctx context.Context, txn *sql.Tx, nids []int64, +) error { + deleteSQL := strings.Replace(deleteJSONSQL, "($1)", sqlutil.QueryVariadic(len(nids)), 1) + deleteStmt, err := txn.Prepare(deleteSQL) + if err != nil { + return fmt.Errorf("s.deleteQueueJSON s.db.Prepare: %w", err) + } + + iNIDs := make([]interface{}, len(nids)) + for k, v := range nids { + iNIDs[k] = v + } + + stmt := sqlutil.TxStmt(txn, deleteStmt) + _, err = stmt.ExecContext(ctx, iNIDs...) + return err +} + +func (s *queueJSONStatements) selectQueueJSON( + ctx context.Context, txn *sql.Tx, jsonNIDs []int64, +) (map[int64][]byte, error) { + selectSQL := strings.Replace(selectJSONSQL, "($1)", sqlutil.QueryVariadic(len(jsonNIDs)), 1) + selectStmt, err := txn.Prepare(selectSQL) + if err != nil { + return nil, fmt.Errorf("s.selectQueueJSON s.db.Prepare: %w", err) + } + + iNIDs := make([]interface{}, len(jsonNIDs)) + for k, v := range jsonNIDs { + iNIDs[k] = v + } + + blobs := map[int64][]byte{} + stmt := sqlutil.TxStmt(txn, selectStmt) + rows, err := stmt.QueryContext(ctx, iNIDs...) + if err != nil { + return nil, fmt.Errorf("s.selectQueueJSON stmt.QueryContext: %w", err) + } + defer internal.CloseAndLogIfError(ctx, rows, "selectJSON: rows.close() failed") + for rows.Next() { + var nid int64 + var blob []byte + if err = rows.Scan(&nid, &blob); err != nil { + return nil, fmt.Errorf("s.selectQueueJSON rows.Scan: %w", err) + } + blobs[nid] = blob + } + return blobs, err +} diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go new file mode 100644 index 00000000..dc08fd70 --- /dev/null +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -0,0 +1,167 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const queuePDUsSchema = ` +CREATE TABLE IF NOT EXISTS federationsender_queue_pdus ( + -- The transaction ID that was generated before persisting the event. + transaction_id TEXT NOT NULL, + -- The domain part of the user ID the m.room.member event is for. + server_name TEXT NOT NULL, + -- The JSON NID from the federationsender_queue_pdus_json table. + json_nid BIGINT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_pdus_pdus_json_nid_idx + ON federationsender_queue_pdus (json_nid, server_name); +` + +const insertQueuePDUSQL = "" + + "INSERT INTO federationsender_queue_pdus (transaction_id, server_name, json_nid)" + + " VALUES ($1, $2, $3)" + +const deleteQueueTransactionPDUsSQL = "" + + "DELETE FROM federationsender_queue_pdus WHERE server_name = $1 AND transaction_id = $2" + +const selectQueueNextTransactionIDSQL = "" + + "SELECT transaction_id FROM federationsender_queue_pdus" + + " WHERE server_name = $1" + + " ORDER BY transaction_id ASC" + + " LIMIT 1" + +const selectQueuePDUsByTransactionSQL = "" + + "SELECT json_nid FROM federationsender_queue_pdus" + + " WHERE server_name = $1 AND transaction_id = $2" + + " LIMIT $3" + +const selectQueueReferenceJSONCountSQL = "" + + "SELECT COUNT(*) FROM federationsender_queue_pdus" + + " WHERE json_nid = $1" + +type queuePDUsStatements struct { + insertQueuePDUStmt *sql.Stmt + deleteQueueTransactionPDUsStmt *sql.Stmt + selectQueueNextTransactionIDStmt *sql.Stmt + selectQueuePDUsByTransactionStmt *sql.Stmt + selectQueueReferenceJSONCountStmt *sql.Stmt +} + +func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(queuePDUsSchema) + if err != nil { + return + } + if s.insertQueuePDUStmt, err = db.Prepare(insertQueuePDUSQL); err != nil { + return + } + if s.deleteQueueTransactionPDUsStmt, err = db.Prepare(deleteQueueTransactionPDUsSQL); err != nil { + return + } + if s.selectQueueNextTransactionIDStmt, err = db.Prepare(selectQueueNextTransactionIDSQL); err != nil { + return + } + if s.selectQueuePDUsByTransactionStmt, err = db.Prepare(selectQueuePDUsByTransactionSQL); err != nil { + return + } + if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil { + return + } + return +} + +func (s *queuePDUsStatements) insertQueuePDU( + ctx context.Context, + txn *sql.Tx, + transactionID gomatrixserverlib.TransactionID, + serverName gomatrixserverlib.ServerName, + nid int64, +) error { + stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) + _, err := stmt.ExecContext( + ctx, + transactionID, // the transaction ID that we initially attempted + serverName, // destination server name + nid, // JSON blob NID + ) + return err +} + +func (s *queuePDUsStatements) deleteQueueTransaction( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteQueueTransactionPDUsStmt) + _, err := stmt.ExecContext(ctx, serverName, transactionID) + return err +} + +func (s *queuePDUsStatements) selectQueueNextTransactionID( + ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, +) (gomatrixserverlib.TransactionID, error) { + var transactionID gomatrixserverlib.TransactionID + stmt := sqlutil.TxStmt(txn, s.selectQueueNextTransactionIDStmt) + err := stmt.QueryRowContext(ctx, serverName).Scan(&transactionID) + if err == sql.ErrNoRows { + return "", nil + } + return transactionID, err +} + +func (s *queuePDUsStatements) selectQueueReferenceJSONCount( + ctx context.Context, txn *sql.Tx, jsonNID int64, +) (int64, error) { + var count int64 + stmt := sqlutil.TxStmt(txn, s.selectQueueReferenceJSONCountStmt) + err := stmt.QueryRowContext(ctx, jsonNID).Scan(&count) + if err == sql.ErrNoRows { + return -1, nil + } + return count, err +} + +func (s *queuePDUsStatements) selectQueuePDUs( + ctx context.Context, txn *sql.Tx, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, + limit int, +) ([]int64, error) { + stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsByTransactionStmt) + rows, err := stmt.QueryContext(ctx, serverName, transactionID, limit) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "queueFromStmt: rows.close() failed") + var result []int64 + for rows.Next() { + var nid int64 + if err = rows.Scan(&nid); err != nil { + return nil, err + } + result = append(result, nid) + } + + return result, rows.Err() +} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index ac303f64..7629ecd2 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -18,17 +18,22 @@ package sqlite3 import ( "context" "database/sql" + "encoding/json" + "fmt" _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/gomatrixserverlib" ) // Database stores information needed by the federation sender type Database struct { joinedHostsStatements roomStatements + queuePDUsStatements + queueJSONStatements sqlutil.PartitionOffsetStatements db *sql.DB } @@ -61,6 +66,14 @@ func (d *Database) prepare() error { return err } + if err = d.queuePDUsStatements.prepare(d.db); err != nil { + return err + } + + if err = d.queueJSONStatements.prepare(d.db); err != nil { + return err + } + return d.PartitionOffsetStatements.Prepare(d.db, "federationsender") } @@ -126,3 +139,125 @@ func (d *Database) GetJoinedHosts( ) ([]types.JoinedHost, error) { return d.selectJoinedHosts(ctx, roomID) } + +// StoreJSON adds a JSON blob into the queue JSON table and returns +// a NID. The NID will then be used when inserting the per-destination +// metadata entries. +func (d *Database) StoreJSON( + ctx context.Context, js string, +) (int64, error) { + nid, err := d.insertQueueJSON(ctx, nil, js) + if err != nil { + return 0, fmt.Errorf("d.insertQueueJSON: %w", err) + } + return nid, nil +} + +// AssociatePDUWithDestination creates an association that the +// destination queues will use to determine which JSON blobs to send +// to which servers. +func (d *Database) AssociatePDUWithDestination( + ctx context.Context, + transactionID gomatrixserverlib.TransactionID, + serverName gomatrixserverlib.ServerName, + nids []int64, +) error { + return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { + for _, nid := range nids { + if err := d.insertQueuePDU( + ctx, // context + txn, // SQL transaction + transactionID, // transaction ID + serverName, // destination server name + nid, // NID from the federationsender_queue_json table + ); err != nil { + return fmt.Errorf("d.insertQueueRetryStmt.ExecContext: %w", err) + } + } + return nil + }) +} + +// GetNextTransactionPDUs retrieves events from the database for +// the next pending transaction, up to the limit specified. +func (d *Database) GetNextTransactionPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, + limit int, +) ( + transactionID gomatrixserverlib.TransactionID, + events []*gomatrixserverlib.HeaderedEvent, + err error, +) { + err = sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { + transactionID, err = d.selectQueueNextTransactionID(ctx, txn, serverName) + if err != nil { + return fmt.Errorf("d.selectQueueNextTransactionID: %w", err) + } + + if transactionID == "" { + return nil + } + + nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, limit) + if err != nil { + return fmt.Errorf("d.selectQueuePDUs: %w", err) + } + + blobs, err := d.selectQueueJSON(ctx, txn, nids) + if err != nil { + return fmt.Errorf("d.selectJSON: %w", err) + } + + for _, blob := range blobs { + var event gomatrixserverlib.HeaderedEvent + if err := json.Unmarshal(blob, &event); err != nil { + return fmt.Errorf("json.Unmarshal: %w", err) + } + events = append(events, &event) + } + + return nil + }) + return +} + +// CleanTransactionPDUs cleans up all associated events for a +// given transaction. This is done when the transaction was sent +// successfully. +func (d *Database) CleanTransactionPDUs( + ctx context.Context, + serverName gomatrixserverlib.ServerName, + transactionID gomatrixserverlib.TransactionID, +) error { + return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error { + nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50) + if err != nil { + return fmt.Errorf("d.selectQueuePDUs: %w", err) + } + + if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil { + return fmt.Errorf("d.deleteQueueTransaction: %w", err) + } + + var count int64 + var deleteNIDs []int64 + for _, nid := range nids { + count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid) + if err != nil { + return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) + } + if count == 0 { + deleteNIDs = append(deleteNIDs, nid) + } + } + + if len(deleteNIDs) > 0 { + if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil { + return fmt.Errorf("d.deleteQueueJSON: %w", err) + } + } + + return nil + }) +}