From 79c5485c8d29b103f2e4bc7f55d8d902bccc3d3a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 24 May 2021 11:43:24 +0100 Subject: [PATCH] Allow clearing federation blacklist at startup for P2P demos --- build/gobind-pinecone/monolith.go | 2 +- build/gobind-yggdrasil/monolith.go | 2 +- cmd/dendrite-demo-libp2p/main.go | 2 +- cmd/dendrite-demo-pinecone/main.go | 2 +- cmd/dendrite-demo-yggdrasil/main.go | 2 +- cmd/dendrite-monolith-server/main.go | 2 +- .../personalities/federationsender.go | 2 +- cmd/dendritejs/main.go | 2 +- federationsender/federationsender.go | 5 ++++ federationsender/internal/perform.go | 1 + federationsender/storage/interface.go | 1 + .../storage/postgres/blacklist_table.go | 30 ++++++++++++------- federationsender/storage/shared/storage.go | 6 ++++ .../storage/sqlite3/blacklist_table.go | 30 ++++++++++++------- federationsender/storage/tables/interface.go | 1 + 15 files changed, 60 insertions(+), 30 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index e6934095..c99ed0b0 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -291,7 +291,7 @@ func (m *DendriteMonolith) Start() { ) fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, keyRing, true, ) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 332d156b..eabea549 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -118,7 +118,7 @@ func (m *DendriteMonolith) Start() { ) fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, keyRing, true, ) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 0610ec77..cc7dcf02 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -166,7 +166,7 @@ func main() { asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationsender.NewInternalAPI( - &base.Base, federation, rsAPI, keyRing, + &base.Base, federation, rsAPI, keyRing, true, ) rsAPI.SetFederationSenderAPI(fsAPI) provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index cd2832e1..7851fdb1 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -180,7 +180,7 @@ func main() { ) rsAPI := rsComponent fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, keyRing, true, ) keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 2a4a335a..2d710ae7 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -114,7 +114,7 @@ func main() { asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, keyRing, true, ) ygg.SetSessionFunc(func(address string) { diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index b82f7321..ef349505 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -99,7 +99,7 @@ func main() { } fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, keyRing, false, ) if base.UseHTTPAPIs { federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) diff --git a/cmd/dendrite-polylith-multi/personalities/federationsender.go b/cmd/dendrite-polylith-multi/personalities/federationsender.go index f8b6d300..ca9a8add 100644 --- a/cmd/dendrite-polylith-multi/personalities/federationsender.go +++ b/cmd/dendrite-polylith-multi/personalities/federationsender.go @@ -28,7 +28,7 @@ func FederationSender(base *setup.BaseDendrite, cfg *config.Dendrite) { rsAPI := base.RoomserverHTTPClient() fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, keyRing, false, ) federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 0dfa4681..7ece94ff 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -210,7 +210,7 @@ func main() { base, userAPI, rsAPI, ) rsAPI.SetAppserviceAPI(asQuery) - fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) + fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing, true) rsAPI.SetFederationSenderAPI(fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 9aab91d4..0732c5d3 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -43,6 +43,7 @@ func NewInternalAPI( federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, keyRing *gomatrixserverlib.KeyRing, + resetBlacklist bool, ) api.FederationSenderInternalAPI { cfg := &base.Cfg.FederationSender @@ -51,6 +52,10 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to connect to federation sender db") } + if resetBlacklist { + _ = federationSenderDB.RemoveAllServersFromBlacklist() + } + stats := &statistics.Statistics{ DB: federationSenderDB, FailuresUntilBlacklist: cfg.FederationMaxRetries, diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 6a2531a0..968df247 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -572,6 +572,7 @@ func (r *FederationSenderInternalAPI) PerformServersAlive( response *api.PerformServersAliveResponse, ) (err error) { for _, srv := range request.Servers { + _ = r.db.RemoveServerFromBlacklist(srv) r.queues.RetryServer(srv) } diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go index b8361304..9c5ac004 100644 --- a/federationsender/storage/interface.go +++ b/federationsender/storage/interface.go @@ -54,6 +54,7 @@ type Database interface { // these don't have contexts passed in as we want things to happen regardless of the request context AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error + RemoveAllServersFromBlacklist() error IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error diff --git a/federationsender/storage/postgres/blacklist_table.go b/federationsender/storage/postgres/blacklist_table.go index f92c59e5..eef37318 100644 --- a/federationsender/storage/postgres/blacklist_table.go +++ b/federationsender/storage/postgres/blacklist_table.go @@ -40,11 +40,15 @@ const selectBlacklistSQL = "" + const deleteBlacklistSQL = "" + "DELETE FROM federationsender_blacklist WHERE server_name = $1" +const deleteAllBlacklistSQL = "" + + "TRUNCATE federationsender_blacklist" + type blacklistStatements struct { - db *sql.DB - insertBlacklistStmt *sql.Stmt - selectBlacklistStmt *sql.Stmt - deleteBlacklistStmt *sql.Stmt + db *sql.DB + insertBlacklistStmt *sql.Stmt + selectBlacklistStmt *sql.Stmt + deleteBlacklistStmt *sql.Stmt + deleteAllBlacklistStmt *sql.Stmt } func NewPostgresBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { @@ -65,11 +69,12 @@ func NewPostgresBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { if s.deleteBlacklistStmt, err = db.Prepare(deleteBlacklistSQL); err != nil { return } + if s.deleteAllBlacklistStmt, err = db.Prepare(deleteAllBlacklistSQL); err != nil { + return + } return } -// insertRoom inserts the room if it didn't already exist. -// If the room didn't exist then last_event_id is set to the empty string. func (s *blacklistStatements) InsertBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) error { @@ -78,9 +83,6 @@ func (s *blacklistStatements) InsertBlacklist( return err } -// selectRoomForUpdate locks the row for the room and returns the last_event_id. -// The row must already exist in the table. Callers can ensure that the row -// exists by calling insertRoom first. func (s *blacklistStatements) SelectBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) (bool, error) { @@ -96,8 +98,6 @@ func (s *blacklistStatements) SelectBlacklist( return res.Next(), nil } -// updateRoom updates the last_event_id for the room. selectRoomForUpdate should -// have already been called earlier within the transaction. func (s *blacklistStatements) DeleteBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) error { @@ -105,3 +105,11 @@ func (s *blacklistStatements) DeleteBlacklist( _, err := stmt.ExecContext(ctx, serverName) return err } + +func (s *blacklistStatements) DeleteAllBlacklist( + ctx context.Context, txn *sql.Tx, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteAllBlacklistStmt) + _, err := stmt.ExecContext(ctx) + return err +} diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 2e74e9d6..2d409922 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -148,6 +148,12 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server }) } +func (d *Database) RemoveAllServersFromBlacklist() error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + return d.FederationSenderBlacklist.DeleteAllBlacklist(context.TODO(), txn) + }) +} + func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) { return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) } diff --git a/federationsender/storage/sqlite3/blacklist_table.go b/federationsender/storage/sqlite3/blacklist_table.go index 90b44ac9..2694e630 100644 --- a/federationsender/storage/sqlite3/blacklist_table.go +++ b/federationsender/storage/sqlite3/blacklist_table.go @@ -40,11 +40,15 @@ const selectBlacklistSQL = "" + const deleteBlacklistSQL = "" + "DELETE FROM federationsender_blacklist WHERE server_name = $1" +const deleteAllBlacklistSQL = "" + + "DELETE FROM federationsender_blacklist" + type blacklistStatements struct { - db *sql.DB - insertBlacklistStmt *sql.Stmt - selectBlacklistStmt *sql.Stmt - deleteBlacklistStmt *sql.Stmt + db *sql.DB + insertBlacklistStmt *sql.Stmt + selectBlacklistStmt *sql.Stmt + deleteBlacklistStmt *sql.Stmt + deleteAllBlacklistStmt *sql.Stmt } func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { @@ -65,11 +69,12 @@ func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { if s.deleteBlacklistStmt, err = db.Prepare(deleteBlacklistSQL); err != nil { return } + if s.deleteAllBlacklistStmt, err = db.Prepare(deleteAllBlacklistSQL); err != nil { + return + } return } -// insertRoom inserts the room if it didn't already exist. -// If the room didn't exist then last_event_id is set to the empty string. func (s *blacklistStatements) InsertBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) error { @@ -78,9 +83,6 @@ func (s *blacklistStatements) InsertBlacklist( return err } -// selectRoomForUpdate locks the row for the room and returns the last_event_id. -// The row must already exist in the table. Callers can ensure that the row -// exists by calling insertRoom first. func (s *blacklistStatements) SelectBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) (bool, error) { @@ -96,8 +98,6 @@ func (s *blacklistStatements) SelectBlacklist( return res.Next(), nil } -// updateRoom updates the last_event_id for the room. selectRoomForUpdate should -// have already been called earlier within the transaction. func (s *blacklistStatements) DeleteBlacklist( ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ) error { @@ -105,3 +105,11 @@ func (s *blacklistStatements) DeleteBlacklist( _, err := stmt.ExecContext(ctx, serverName) return err } + +func (s *blacklistStatements) DeleteAllBlacklist( + ctx context.Context, txn *sql.Tx, +) error { + stmt := sqlutil.TxStmt(txn, s.deleteAllBlacklistStmt) + _, err := stmt.ExecContext(ctx) + return err +} diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 34ff0b97..995b6f47 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -60,6 +60,7 @@ type FederationSenderBlacklist interface { InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error + DeleteAllBlacklist(ctx context.Context, txn *sql.Tx) error } type FederationSenderOutboundPeeks interface {