move some shit around

This commit is contained in:
Charlotte Som 2024-12-02 07:46:27 +00:00
parent 95ae1c09cb
commit 34f09c8a3d
9 changed files with 101 additions and 96 deletions

77
src/app_state.rs Normal file
View file

@ -0,0 +1,77 @@
use std::{borrow::Cow, collections::BTreeSet};
use anyhow::Result;
use bytes::Bytes;
use tokio::sync::{broadcast, mpsc, Mutex};
use crate::wire_proto::StreamEvent;
pub struct RelayServer {
pub db: sled::Db,
pub db_history: sled::Tree,
pub db_users: sled::Tree,
pub db_index_cursors: sled::Tree,
pub plc_resolver: Cow<'static, str>,
pub known_good_hosts: Mutex<BTreeSet<String>>,
pub active_indexers: Mutex<BTreeSet<String>>,
pub event_tx: mpsc::Sender<StreamEvent>,
pub raw_block_tx: broadcast::Sender<Bytes>,
}
impl RelayServer {
pub fn new(db: sled::Db, event_tx: mpsc::Sender<StreamEvent>) -> Self {
let (raw_block_tx, _) = broadcast::channel(128);
let hosts = db
.get("hosts")
.expect("Failed to read db entry for hosts")
.and_then(|v| serde_ipld_dagcbor::from_slice::<Vec<String>>(&v).ok())
.unwrap_or_default();
Self {
event_tx,
raw_block_tx,
plc_resolver: Cow::Borrowed("plc.directory"),
known_good_hosts: Mutex::new(hosts.into_iter().collect()),
active_indexers: Default::default(),
db_history: db
.open_tree("history")
.expect("failed to open history tree"),
db_users: db.open_tree("users").expect("failed to open users tree"),
db_index_cursors: db
.open_tree("index_cursors")
.expect("failed to ope index_cursors tree"),
db,
}
}
pub async fn add_good_host(&self, host: String) -> Result<()> {
let mut hosts = self.known_good_hosts.lock().await;
if hosts.contains(&host) {
return Ok(());
}
tracing::debug!(%host, "discovered new known-good host");
hosts.insert(host);
let hosts_copy = hosts.iter().collect::<Vec<_>>();
let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?;
drop(hosts);
self.db.insert("hosts", serialized_hosts)?;
Ok(())
}
pub async fn remove_good_host(&self, host: String) -> Result<()> {
tracing::debug!(%host, "dropping known-good host");
let mut hosts = self.known_good_hosts.lock().await;
hosts.remove(&host);
let hosts_copy = hosts.iter().collect::<Vec<_>>();
let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?;
drop(hosts);
self.db.insert("hosts", serialized_hosts)?;
Ok(())
}
}

View file

@ -1,7 +1,3 @@
use crate::{
relay_subscription::handle_subscription, request_crawl::handle_request_crawl, RelayServer,
};
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use anyhow::Result; use anyhow::Result;
@ -14,6 +10,11 @@ use hyper::{
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use crate::{
relay::{request_crawl::handle_request_crawl, subscribe::handle_subscription},
RelayServer,
};
pub type HttpBody = BoxBody<Bytes, hyper::Error>; pub type HttpBody = BoxBody<Bytes, hyper::Error>;
pub fn body_empty() -> HttpBody { pub fn body_empty() -> HttpBody {
Empty::<Bytes>::new().map_err(|e| match e {}).boxed() Empty::<Bytes>::new().map_err(|e| match e {}).boxed()

View file

@ -1,84 +1,8 @@
use std::{borrow::Cow, collections::BTreeSet}; mod app_state;
pub use app_state::*;
use anyhow::Result;
use bytes::Bytes;
use tokio::sync::{broadcast, mpsc, Mutex};
use wire_proto::StreamEvent;
pub struct RelayServer {
pub db: sled::Db,
pub db_history: sled::Tree,
pub db_users: sled::Tree,
pub db_index_cursors: sled::Tree,
pub plc_resolver: Cow<'static, str>,
pub known_good_hosts: Mutex<BTreeSet<String>>,
pub active_indexers: Mutex<BTreeSet<String>>,
pub event_tx: mpsc::Sender<StreamEvent>,
pub raw_block_tx: broadcast::Sender<Bytes>,
}
impl RelayServer {
pub fn new(db: sled::Db, event_tx: mpsc::Sender<StreamEvent>) -> Self {
let (raw_block_tx, _) = broadcast::channel(128);
let hosts = db
.get("hosts")
.expect("Failed to read db entry for hosts")
.and_then(|v| serde_ipld_dagcbor::from_slice::<Vec<String>>(&v).ok())
.unwrap_or_default();
Self {
event_tx,
raw_block_tx,
plc_resolver: Cow::Borrowed("plc.directory"),
known_good_hosts: Mutex::new(hosts.into_iter().collect()),
active_indexers: Default::default(),
db_history: db
.open_tree("history")
.expect("failed to open history tree"),
db_users: db.open_tree("users").expect("failed to open users tree"),
db_index_cursors: db
.open_tree("index_cursors")
.expect("failed to ope index_cursors tree"),
db,
}
}
pub async fn add_good_host(&self, host: String) -> Result<()> {
let mut hosts = self.known_good_hosts.lock().await;
if hosts.contains(&host) {
return Ok(());
}
tracing::debug!(%host, "discovered new known-good host");
hosts.insert(host);
let hosts_copy = hosts.iter().collect::<Vec<_>>();
let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?;
drop(hosts);
self.db.insert("hosts", serialized_hosts)?;
Ok(())
}
pub async fn remove_good_host(&self, host: String) -> Result<()> {
tracing::debug!(%host, "dropping known-good host");
let mut hosts = self.known_good_hosts.lock().await;
hosts.remove(&host);
let hosts_copy = hosts.iter().collect::<Vec<_>>();
let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?;
drop(hosts);
self.db.insert("hosts", serialized_hosts)?;
Ok(())
}
}
pub mod http; pub mod http;
pub mod indexer; pub mod relay;
pub mod relay_subscription;
pub mod request_crawl;
pub mod sequencer; pub mod sequencer;
pub mod tls; pub mod tls;
pub mod user; pub mod user;

View file

@ -6,7 +6,7 @@ use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use cerulea_relay::{ use cerulea_relay::{
http::{self}, http::{self},
indexer::index_servers, relay::index::index_servers,
sequencer::start_sequencer, sequencer::start_sequencer,
RelayServer, RelayServer,
}; };

View file

@ -469,16 +469,20 @@ pub async fn index_server(server: Arc<RelayServer>, host: String) -> Result<()>
r r
} }
pub fn start_indexing_server(server: Arc<RelayServer>, host: String) {
tokio::task::spawn(async move {
if let Err(e) = index_server(server, host.clone()).await {
tracing::warn!(%host, "encountered error subscribing to PDS: {e:?}");
}
});
}
pub fn index_servers(server: Arc<RelayServer>, hosts: &[String]) { pub fn index_servers(server: Arc<RelayServer>, hosts: &[String]) {
// in future we will spider out but right now i just want da stuff from my PDS // in future we will spider out but right now i just want da stuff from my PDS
for host in hosts.iter() { for host in hosts.iter() {
let host = host.to_string(); let host = host.to_string();
let server = Arc::clone(&server); let server = Arc::clone(&server);
tokio::task::spawn(async move { start_indexing_server(server, host);
if let Err(e) = index_server(server, host.clone()).await {
tracing::warn!(%host, "encountered error subscribing to PDS: {e:?}");
}
});
} }
} }

3
src/relay/mod.rs Normal file
View file

@ -0,0 +1,3 @@
pub mod index;
pub mod request_crawl;
pub mod subscribe;

View file

@ -8,7 +8,7 @@ use hyper::{body::Incoming, Request, Response};
use crate::{ use crate::{
http::{body_full, ServerResponse}, http::{body_full, ServerResponse},
indexer::index_server, relay::index::start_indexing_server,
RelayServer, RelayServer,
}; };
@ -31,11 +31,7 @@ pub async fn handle_request_crawl(
}; };
let hostname = input.data.hostname; let hostname = input.data.hostname;
tokio::task::spawn(async move { start_indexing_server(server, hostname);
if let Err(e) = index_server(server, hostname.clone()).await {
tracing::warn!(host = %hostname, "encountered error subscribing to PDS: {e:?}");
}
});
Ok(Response::builder() Ok(Response::builder()
.status(200) .status(200)

View file

@ -129,7 +129,7 @@ pub async fn lookup_user(server: &RelayServer, did: &str) -> Result<User> {
return Ok(cached_user); return Ok(cached_user);
} }
return fetch_user(server, did).await; fetch_user(server, did).await
} }
pub fn store_user(server: &RelayServer, user: &User) -> Result<()> { pub fn store_user(server: &RelayServer, user: &User) -> Result<()> {