diff --git a/src/admin.rs b/src/admin.rs new file mode 100644 index 0000000..23ef87b --- /dev/null +++ b/src/admin.rs @@ -0,0 +1,60 @@ +use std::{io::Cursor, sync::Arc}; + +use anyhow::{bail, Result}; +use atrium_api::com::atproto::sync::subscribe_repos; +use hyper::{body::Incoming, Request}; +use ipld_core::ipld::Ipld; +use serde_ipld_dagcbor::DecodeError; + +use crate::{http::ServerResponse, wire_proto::StreamEventHeader, RelayServer}; + +pub fn purge_did(server: &RelayServer, did: &str) -> Result<()> { + // drop commits + + for event in server.db_history.iter() { + let (seq, event) = event?; + let mut cursor = Cursor::new(&event); + let (header_buf, payload_buf) = + match serde_ipld_dagcbor::from_reader::(&mut cursor) { + Err(DecodeError::TrailingData) => event.split_at(cursor.position() as usize), + _ => continue, + }; + + let header = serde_ipld_dagcbor::from_slice::(header_buf)?; + let event_did = match header.t.as_deref() { + Some("#commit") => { + let payload = + serde_ipld_dagcbor::from_slice::(payload_buf)?; + Some(payload.data.repo) + } + Some("#account") => { + let payload = + serde_ipld_dagcbor::from_slice::(payload_buf)?; + Some(payload.data.did) + } + _ => None, + }; + + if let Some(event_did) = event_did { + if event_did.as_str() == did { + let _ = server.db_history.remove(seq); + } + } + } + + Ok(()) +} + +// TODO: ban host + +pub async fn handle_purge_did( + _server: Arc, + _req: Request, +) -> Result { + // TODO: + // - validate admin Authorization header + // - parse JSON body for target did + // - run purge_did function + + bail!("not yet implemented") +} diff --git a/src/http.rs b/src/http.rs index 2122d13..b577199 100644 --- a/src/http.rs +++ b/src/http.rs @@ -11,6 +11,7 @@ use hyper_util::rt::TokioIo; use tokio::net::TcpListener; use crate::{ + admin::handle_purge_did, relay::{request_crawl::handle_request_crawl, subscribe::handle_subscription}, RelayServer, }; @@ -39,11 +40,12 @@ async fn serve(server: Arc, req: Request) -> Result { handle_subscription(server, req).await } - (&Method::POST, "/xrpc/com.atproto.sync.requestCrawl") => { handle_request_crawl(server, req).await } + (&Method::POST, "/api/admin/purge-did") => handle_purge_did(server, req).await, + _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .header("Content-Type", "text/plain") diff --git a/src/lib.rs b/src/lib.rs index 056a4fd..5428156 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ mod app_state; pub use app_state::*; +pub mod admin; pub mod http; pub mod relay; pub mod repo;