diff --git a/src/admin.rs b/src/admin.rs new file mode 100644 index 0000000..38a6cbe --- /dev/null +++ b/src/admin.rs @@ -0,0 +1,60 @@ +use std::{io::Cursor, sync::Arc}; + +use anyhow::{bail, Result}; +use atrium_api::com::atproto::sync::subscribe_repos; +use hyper::{body::Incoming, Request}; +use ipld_core::ipld::Ipld; +use serde_ipld_dagcbor::DecodeError; + +use crate::{http::ServerResponse, wire_proto::StreamEventHeader, RelayServer}; + +pub fn purge_did(server: &RelayServer, did: &str) -> Result<()> { + // drop commits + + for event in server.db_history.iter() { + let (seq, event) = event?; + let mut cursor = Cursor::new(&event); + let (header_buf, payload_buf) = + match serde_ipld_dagcbor::from_reader::(&mut cursor) { + Err(DecodeError::TrailingData) => event.split_at(cursor.position() as usize), + _ => continue, + }; + + let header = serde_ipld_dagcbor::from_slice::(header_buf)?; + let event_did = match header.t.as_deref() { + Some("#commit") => { + let payload = + serde_ipld_dagcbor::from_slice::(payload_buf)?; + Some(payload.data.repo) + } + Some("#account") => { + let payload = + serde_ipld_dagcbor::from_slice::(payload_buf)?; + Some(payload.data.did) + } + _ => None, + }; + + if let Some(event_did) = event_did { + if event_did.as_str() == did { + let _ = server.db_history.remove(seq); + } + } + } + + Ok(()) +} + +// TODO: ban host + +pub fn handle_purge_did( + _server: Arc, + _req: Request, +) -> Result { + // TODO: + // - validate admin Authorization header + // - parse JSON body for target did + // - run purge_did function + + bail!("not yet implemented") +} diff --git a/src/lib.rs b/src/lib.rs index 056a4fd..5428156 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ mod app_state; pub use app_state::*; +pub mod admin; pub mod http; pub mod relay; pub mod repo;