Compare commits

...

3 commits

5 changed files with 189 additions and 23 deletions

143
Cargo.lock generated
View file

@ -41,6 +41,55 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125"
dependencies = [
"anstyle",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.93" version = "1.0.93"
@ -238,6 +287,7 @@ dependencies = [
"anyhow", "anyhow",
"atrium-api", "atrium-api",
"bytes", "bytes",
"clap",
"fastwebsockets", "fastwebsockets",
"http-body-util", "http-body-util",
"hyper", "hyper",
@ -313,6 +363,46 @@ dependencies = [
"libloading", "libloading",
] ]
[[package]]
name = "clap"
version = "4.5.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.5.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.89",
]
[[package]]
name = "clap_lex"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7"
[[package]] [[package]]
name = "cmake" name = "cmake"
version = "0.1.51" version = "0.1.51"
@ -322,6 +412,12 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "colorchoice"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]] [[package]]
name = "core-foundation-sys" name = "core-foundation-sys"
version = "0.8.7" version = "0.8.7"
@ -441,7 +537,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -601,6 +697,12 @@ version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3"
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.3.9" version = "0.3.9"
@ -613,7 +715,7 @@ version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
dependencies = [ dependencies = [
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -754,6 +856,12 @@ dependencies = [
"serde_bytes", "serde_bytes",
] ]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.12.1" version = "0.12.1"
@ -885,7 +993,7 @@ dependencies = [
"hermit-abi", "hermit-abi",
"libc", "libc",
"wasi", "wasi",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -1202,7 +1310,7 @@ dependencies = [
"libc", "libc",
"spin", "spin",
"untrusted", "untrusted",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -1227,7 +1335,7 @@ dependencies = [
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -1421,7 +1529,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -1430,6 +1538,12 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]] [[package]]
name = "subtle" name = "subtle"
version = "2.6.1" version = "2.6.1"
@ -1503,7 +1617,7 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
"windows-sys", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@ -1655,6 +1769,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.11.0" version = "1.11.0"
@ -1807,6 +1927,15 @@ dependencies = [
"windows-targets", "windows-targets",
] ]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets",
]
[[package]] [[package]]
name = "windows-targets" name = "windows-targets"
version = "0.52.6" version = "0.52.6"

View file

@ -7,6 +7,7 @@ edition = "2021"
anyhow = "1.0.93" anyhow = "1.0.93"
atrium-api = { version = "0.24.8", default-features = false, features = ["tokio"] } atrium-api = { version = "0.24.8", default-features = false, features = ["tokio"] }
bytes = { version = "1.8.0", features = ["serde"] } bytes = { version = "1.8.0", features = ["serde"] }
clap = { version = "4.5.21", features = ["derive"] }
fastwebsockets = { version = "0.8.0", features = ["hyper", "unstable-split", "upgrade"] } fastwebsockets = { version = "0.8.0", features = ["hyper", "unstable-split", "upgrade"] }
http-body-util = "0.1.2" http-body-util = "0.1.2"
hyper = { version = "1.5.1", features = ["client", "full", "http1", "http2", "server"] } hyper = { version = "1.5.1", features = ["client", "full", "http1", "http2", "server"] }

View file

@ -367,6 +367,7 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()>
bail!("too many repos! ditching from cerulea relay") bail!("too many repos! ditching from cerulea relay")
} }
let _ = server.add_good_host(host.clone()).await;
let mut subscription = DataServerSubscription::new(server, host); let mut subscription = DataServerSubscription::new(server, host);
// TODO: load seq from db ? // TODO: load seq from db ?

View file

@ -1,5 +1,6 @@
use std::collections::HashSet; use std::collections::BTreeSet;
use anyhow::Result;
use bytes::Bytes; use bytes::Bytes;
use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::sync::{broadcast, mpsc, Mutex};
use wire_proto::StreamEvent; use wire_proto::StreamEvent;
@ -9,7 +10,8 @@ pub struct RelayServer {
pub db_history: sled::Tree, pub db_history: sled::Tree,
pub db_users: sled::Tree, pub db_users: sled::Tree,
pub active_indexers: Mutex<HashSet<String>>, pub known_good_hosts: Mutex<BTreeSet<String>>,
pub active_indexers: Mutex<BTreeSet<String>>,
pub event_tx: mpsc::Sender<StreamEvent>, pub event_tx: mpsc::Sender<StreamEvent>,
pub raw_block_tx: broadcast::Sender<Bytes>, pub raw_block_tx: broadcast::Sender<Bytes>,
@ -18,10 +20,18 @@ pub struct RelayServer {
impl RelayServer { impl RelayServer {
pub fn new(db: sled::Db, event_tx: mpsc::Sender<StreamEvent>) -> Self { pub fn new(db: sled::Db, event_tx: mpsc::Sender<StreamEvent>) -> Self {
let (raw_block_tx, _) = broadcast::channel(128); let (raw_block_tx, _) = broadcast::channel(128);
let hosts = db
.get("hosts")
.expect("Failed to read db entry for hosts")
.and_then(|v| serde_ipld_dagcbor::from_slice::<Vec<String>>(&v).ok())
.unwrap_or_default();
Self { Self {
event_tx, event_tx,
raw_block_tx, raw_block_tx,
known_good_hosts: Mutex::new(hosts.into_iter().collect()),
active_indexers: Default::default(), active_indexers: Default::default(),
db_history: db db_history: db
@ -31,6 +41,22 @@ impl RelayServer {
db, db,
} }
} }
pub async fn add_good_host(&self, host: String) -> Result<()> {
let mut hosts = self.known_good_hosts.lock().await;
if hosts.contains(&host) {
return Ok(());
}
tracing::debug!(%host, "discovered new known-good host");
hosts.insert(host);
let hosts_copy = hosts.iter().collect::<Vec<_>>();
let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?;
drop(hosts);
self.db.insert("hosts", serialized_hosts)?;
Ok(())
}
} }
pub mod http; pub mod http;

View file

@ -1,4 +1,5 @@
use anyhow::Result; use anyhow::Result;
use clap::Parser;
use std::{net::SocketAddr, str::FromStr, sync::Arc}; use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@ -10,8 +11,22 @@ use cerulea_relay::{
RelayServer, RelayServer,
}; };
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// Bind address for the service
#[arg(short, long, default_value = "127.0.0.1:3000")]
bind_addr: SocketAddr,
/// Database RAM cache capacity in megabytes
#[arg(long, default_value_t = 1024)]
cache_capacity: u64,
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let args = Args::parse();
tracing_subscriber::registry() tracing_subscriber::registry()
.with(fmt::layer()) .with(fmt::layer())
.with(EnvFilter::from_str("cerulea_relay=debug").unwrap()) .with(EnvFilter::from_str("cerulea_relay=debug").unwrap())
@ -19,8 +34,7 @@ async fn main() -> Result<()> {
let db = sled::Config::default() let db = sled::Config::default()
.path("data") .path("data")
// TODO: configurable cache capacity .cache_capacity(args.cache_capacity * 1024 * 1024)
.cache_capacity(1024 * 1024 * 1024)
.use_compression(true) .use_compression(true)
.open() .open()
.expect("Failed to open database"); .expect("Failed to open database");
@ -28,20 +42,15 @@ async fn main() -> Result<()> {
let (event_tx, event_rx) = mpsc::channel(128); let (event_tx, event_rx) = mpsc::channel(128);
let server = Arc::new(RelayServer::new(db, event_tx)); let server = Arc::new(RelayServer::new(db, event_tx));
let initial_hosts: Vec<String> = {
let hosts = server.known_good_hosts.lock().await;
hosts.iter().cloned().collect()
};
tracing::debug!(hosts = ?initial_hosts, "got initial hosts");
let hosts = server index_servers(Arc::clone(&server), &initial_hosts);
.db
.get("hosts")?
.and_then(|v| serde_ipld_dagcbor::from_slice::<Vec<String>>(&v).ok())
.unwrap_or_default();
tracing::debug!(?hosts, "got list of hosts");
index_servers(Arc::clone(&server), &hosts);
start_sequencer(Arc::clone(&server), event_rx); start_sequencer(Arc::clone(&server), event_rx);
// TODO: configurable bind address http::listen(server, args.bind_addr).await?;
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
http::listen(server, addr).await?;
Ok(()) Ok(())
} }