From 4376d4a2e063c1a10467a1779a7c4094cc1a1f14 Mon Sep 17 00:00:00 2001 From: Charlotte Som Date: Wed, 27 Nov 2024 04:51:34 +0200 Subject: [PATCH] add known-good hosts to db entry --- src/indexer.rs | 1 + src/lib.rs | 28 +++++++++++++++++++++++++++- src/main.rs | 14 ++++++-------- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/indexer.rs b/src/indexer.rs index e643734..a609c7a 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -367,6 +367,7 @@ async fn host_subscription(server: Arc, host: String) -> Result<()> bail!("too many repos! ditching from cerulea relay") } + let _ = server.add_good_host(host.clone()).await; let mut subscription = DataServerSubscription::new(server, host); // TODO: load seq from db ? diff --git a/src/lib.rs b/src/lib.rs index 9155b95..db5df54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; +use anyhow::Result; use bytes::Bytes; use tokio::sync::{broadcast, mpsc, Mutex}; use wire_proto::StreamEvent; @@ -9,6 +10,7 @@ pub struct RelayServer { pub db_history: sled::Tree, pub db_users: sled::Tree, + pub known_good_hosts: Mutex>, pub active_indexers: Mutex>, pub event_tx: mpsc::Sender, @@ -18,10 +20,18 @@ pub struct RelayServer { impl RelayServer { pub fn new(db: sled::Db, event_tx: mpsc::Sender) -> Self { let (raw_block_tx, _) = broadcast::channel(128); + + let hosts = db + .get("hosts") + .expect("Failed to read db entry for hosts") + .and_then(|v| serde_ipld_dagcbor::from_slice::>(&v).ok()) + .unwrap_or_default(); + Self { event_tx, raw_block_tx, + known_good_hosts: Mutex::new(hosts.into_iter().collect()), active_indexers: Default::default(), db_history: db @@ -31,6 +41,22 @@ impl RelayServer { db, } } + + pub async fn add_good_host(&self, host: String) -> Result<()> { + let mut hosts = self.known_good_hosts.lock().await; + if hosts.contains(&host) { + return Ok(()); + } + tracing::debug!(%host, "discovered new known-good host"); + + hosts.insert(host); + let hosts_copy = hosts.iter().collect::>(); + let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?; + drop(hosts); + + self.db.insert("hosts", serialized_hosts)?; + Ok(()) + } } pub mod http; diff --git a/src/main.rs b/src/main.rs index c050485..84c2fcb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,15 +28,13 @@ async fn main() -> Result<()> { let (event_tx, event_rx) = mpsc::channel(128); let server = Arc::new(RelayServer::new(db, event_tx)); + let initial_hosts: Vec = { + let hosts = server.known_good_hosts.lock().await; + hosts.iter().cloned().collect() + }; + tracing::debug!(hosts = ?initial_hosts, "got initial hosts"); - let hosts = server - .db - .get("hosts")? - .and_then(|v| serde_ipld_dagcbor::from_slice::>(&v).ok()) - .unwrap_or_default(); - tracing::debug!(?hosts, "got list of hosts"); - - index_servers(Arc::clone(&server), &hosts); + index_servers(Arc::clone(&server), &initial_hosts); start_sequencer(Arc::clone(&server), event_rx); // TODO: configurable bind address