diff --git a/src/app_state.rs b/src/app_state.rs new file mode 100644 index 0000000..1b9193e --- /dev/null +++ b/src/app_state.rs @@ -0,0 +1,77 @@ +use std::{borrow::Cow, collections::BTreeSet}; + +use anyhow::Result; +use bytes::Bytes; +use tokio::sync::{broadcast, mpsc, Mutex}; + +use crate::wire_proto::StreamEvent; + +pub struct RelayServer { + pub db: sled::Db, + pub db_history: sled::Tree, + pub db_users: sled::Tree, + pub db_index_cursors: sled::Tree, + + pub plc_resolver: Cow<'static, str>, + pub known_good_hosts: Mutex>, + pub active_indexers: Mutex>, + + pub event_tx: mpsc::Sender, + pub raw_block_tx: broadcast::Sender, +} + +impl RelayServer { + pub fn new(db: sled::Db, event_tx: mpsc::Sender) -> Self { + let (raw_block_tx, _) = broadcast::channel(128); + + let hosts = db + .get("hosts") + .expect("Failed to read db entry for hosts") + .and_then(|v| serde_ipld_dagcbor::from_slice::>(&v).ok()) + .unwrap_or_default(); + + Self { + event_tx, + raw_block_tx, + + plc_resolver: Cow::Borrowed("plc.directory"), + known_good_hosts: Mutex::new(hosts.into_iter().collect()), + active_indexers: Default::default(), + + db_history: db + .open_tree("history") + .expect("failed to open history tree"), + db_users: db.open_tree("users").expect("failed to open users tree"), + db_index_cursors: db + .open_tree("index_cursors") + .expect("failed to ope index_cursors tree"), + db, + } + } + + pub async fn add_good_host(&self, host: String) -> Result<()> { + let mut hosts = self.known_good_hosts.lock().await; + if hosts.contains(&host) { + return Ok(()); + } + tracing::debug!(%host, "discovered new known-good host"); + + hosts.insert(host); + let hosts_copy = hosts.iter().collect::>(); + let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?; + drop(hosts); + self.db.insert("hosts", serialized_hosts)?; + Ok(()) + } + + pub async fn remove_good_host(&self, host: String) -> Result<()> { + tracing::debug!(%host, "dropping known-good host"); + let mut hosts = self.known_good_hosts.lock().await; + hosts.remove(&host); + let hosts_copy = hosts.iter().collect::>(); + let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?; + drop(hosts); + self.db.insert("hosts", serialized_hosts)?; + Ok(()) + } +} diff --git a/src/http.rs b/src/http.rs index 2b08051..2122d13 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,7 +1,3 @@ -use crate::{ - relay_subscription::handle_subscription, request_crawl::handle_request_crawl, RelayServer, -}; - use std::{net::SocketAddr, sync::Arc}; use anyhow::Result; @@ -14,6 +10,11 @@ use hyper::{ use hyper_util::rt::TokioIo; use tokio::net::TcpListener; +use crate::{ + relay::{request_crawl::handle_request_crawl, subscribe::handle_subscription}, + RelayServer, +}; + pub type HttpBody = BoxBody; pub fn body_empty() -> HttpBody { Empty::::new().map_err(|e| match e {}).boxed() diff --git a/src/lib.rs b/src/lib.rs index 94a4f75..a22d5d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,84 +1,8 @@ -use std::{borrow::Cow, collections::BTreeSet}; - -use anyhow::Result; -use bytes::Bytes; -use tokio::sync::{broadcast, mpsc, Mutex}; -use wire_proto::StreamEvent; - -pub struct RelayServer { - pub db: sled::Db, - pub db_history: sled::Tree, - pub db_users: sled::Tree, - pub db_index_cursors: sled::Tree, - - pub plc_resolver: Cow<'static, str>, - pub known_good_hosts: Mutex>, - pub active_indexers: Mutex>, - - pub event_tx: mpsc::Sender, - pub raw_block_tx: broadcast::Sender, -} - -impl RelayServer { - pub fn new(db: sled::Db, event_tx: mpsc::Sender) -> Self { - let (raw_block_tx, _) = broadcast::channel(128); - - let hosts = db - .get("hosts") - .expect("Failed to read db entry for hosts") - .and_then(|v| serde_ipld_dagcbor::from_slice::>(&v).ok()) - .unwrap_or_default(); - - Self { - event_tx, - raw_block_tx, - - plc_resolver: Cow::Borrowed("plc.directory"), - known_good_hosts: Mutex::new(hosts.into_iter().collect()), - active_indexers: Default::default(), - - db_history: db - .open_tree("history") - .expect("failed to open history tree"), - db_users: db.open_tree("users").expect("failed to open users tree"), - db_index_cursors: db - .open_tree("index_cursors") - .expect("failed to ope index_cursors tree"), - db, - } - } - - pub async fn add_good_host(&self, host: String) -> Result<()> { - let mut hosts = self.known_good_hosts.lock().await; - if hosts.contains(&host) { - return Ok(()); - } - tracing::debug!(%host, "discovered new known-good host"); - - hosts.insert(host); - let hosts_copy = hosts.iter().collect::>(); - let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?; - drop(hosts); - self.db.insert("hosts", serialized_hosts)?; - Ok(()) - } - - pub async fn remove_good_host(&self, host: String) -> Result<()> { - tracing::debug!(%host, "dropping known-good host"); - let mut hosts = self.known_good_hosts.lock().await; - hosts.remove(&host); - let hosts_copy = hosts.iter().collect::>(); - let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?; - drop(hosts); - self.db.insert("hosts", serialized_hosts)?; - Ok(()) - } -} +mod app_state; +pub use app_state::*; pub mod http; -pub mod indexer; -pub mod relay_subscription; -pub mod request_crawl; +pub mod relay; pub mod sequencer; pub mod tls; pub mod user; diff --git a/src/main.rs b/src/main.rs index eb1fc5d..3362145 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use cerulea_relay::{ http::{self}, - indexer::index_servers, + relay::index::index_servers, sequencer::start_sequencer, RelayServer, }; diff --git a/src/indexer.rs b/src/relay/index.rs similarity index 97% rename from src/indexer.rs rename to src/relay/index.rs index 337a9f5..cb7a6d7 100644 --- a/src/indexer.rs +++ b/src/relay/index.rs @@ -469,16 +469,20 @@ pub async fn index_server(server: Arc, host: String) -> Result<()> r } +pub fn start_indexing_server(server: Arc, host: String) { + tokio::task::spawn(async move { + if let Err(e) = index_server(server, host.clone()).await { + tracing::warn!(%host, "encountered error subscribing to PDS: {e:?}"); + } + }); +} + pub fn index_servers(server: Arc, hosts: &[String]) { // in future we will spider out but right now i just want da stuff from my PDS for host in hosts.iter() { let host = host.to_string(); let server = Arc::clone(&server); - tokio::task::spawn(async move { - if let Err(e) = index_server(server, host.clone()).await { - tracing::warn!(%host, "encountered error subscribing to PDS: {e:?}"); - } - }); + start_indexing_server(server, host); } } diff --git a/src/relay/mod.rs b/src/relay/mod.rs new file mode 100644 index 0000000..6da3215 --- /dev/null +++ b/src/relay/mod.rs @@ -0,0 +1,3 @@ +pub mod index; +pub mod request_crawl; +pub mod subscribe; diff --git a/src/request_crawl.rs b/src/relay/request_crawl.rs similarity index 82% rename from src/request_crawl.rs rename to src/relay/request_crawl.rs index df136a9..be894a4 100644 --- a/src/request_crawl.rs +++ b/src/relay/request_crawl.rs @@ -8,7 +8,7 @@ use hyper::{body::Incoming, Request, Response}; use crate::{ http::{body_full, ServerResponse}, - indexer::index_server, + relay::index::start_indexing_server, RelayServer, }; @@ -31,11 +31,7 @@ pub async fn handle_request_crawl( }; let hostname = input.data.hostname; - tokio::task::spawn(async move { - if let Err(e) = index_server(server, hostname.clone()).await { - tracing::warn!(host = %hostname, "encountered error subscribing to PDS: {e:?}"); - } - }); + start_indexing_server(server, hostname); Ok(Response::builder() .status(200) diff --git a/src/relay_subscription.rs b/src/relay/subscribe.rs similarity index 100% rename from src/relay_subscription.rs rename to src/relay/subscribe.rs diff --git a/src/user.rs b/src/user.rs index 9956a32..6708c4f 100644 --- a/src/user.rs +++ b/src/user.rs @@ -129,7 +129,7 @@ pub async fn lookup_user(server: &RelayServer, did: &str) -> Result { return Ok(cached_user); } - return fetch_user(server, did).await; + fetch_user(server, did).await } pub fn store_user(server: &RelayServer, user: &User) -> Result<()> {