[wip] admin: purge events by did
This commit is contained in:
		
							parent
							
								
									4ec26a9c83
								
							
						
					
					
						commit
						3204162282
					
				
					 3 changed files with 64 additions and 1 deletions
				
			
		
							
								
								
									
										60
									
								
								src/admin.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										60
									
								
								src/admin.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,60 @@ | ||||||
|  | use std::{io::Cursor, sync::Arc}; | ||||||
|  | 
 | ||||||
|  | use anyhow::{bail, Result}; | ||||||
|  | use atrium_api::com::atproto::sync::subscribe_repos; | ||||||
|  | use hyper::{body::Incoming, Request}; | ||||||
|  | use ipld_core::ipld::Ipld; | ||||||
|  | use serde_ipld_dagcbor::DecodeError; | ||||||
|  | 
 | ||||||
|  | use crate::{http::ServerResponse, wire_proto::StreamEventHeader, RelayServer}; | ||||||
|  | 
 | ||||||
|  | pub fn purge_did(server: &RelayServer, did: &str) -> Result<()> { | ||||||
|  |     // drop commits
 | ||||||
|  | 
 | ||||||
|  |     for event in server.db_history.iter() { | ||||||
|  |         let (seq, event) = event?; | ||||||
|  |         let mut cursor = Cursor::new(&event); | ||||||
|  |         let (header_buf, payload_buf) = | ||||||
|  |             match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) { | ||||||
|  |                 Err(DecodeError::TrailingData) => event.split_at(cursor.position() as usize), | ||||||
|  |                 _ => continue, | ||||||
|  |             }; | ||||||
|  | 
 | ||||||
|  |         let header = serde_ipld_dagcbor::from_slice::<StreamEventHeader>(header_buf)?; | ||||||
|  |         let event_did = match header.t.as_deref() { | ||||||
|  |             Some("#commit") => { | ||||||
|  |                 let payload = | ||||||
|  |                     serde_ipld_dagcbor::from_slice::<subscribe_repos::Commit>(payload_buf)?; | ||||||
|  |                 Some(payload.data.repo) | ||||||
|  |             } | ||||||
|  |             Some("#account") => { | ||||||
|  |                 let payload = | ||||||
|  |                     serde_ipld_dagcbor::from_slice::<subscribe_repos::Account>(payload_buf)?; | ||||||
|  |                 Some(payload.data.did) | ||||||
|  |             } | ||||||
|  |             _ => None, | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         if let Some(event_did) = event_did { | ||||||
|  |             if event_did.as_str() == did { | ||||||
|  |                 let _ = server.db_history.remove(seq); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // TODO: ban host
 | ||||||
|  | 
 | ||||||
|  | pub async fn handle_purge_did( | ||||||
|  |     _server: Arc<RelayServer>, | ||||||
|  |     _req: Request<Incoming>, | ||||||
|  | ) -> Result<ServerResponse> { | ||||||
|  |     // TODO:
 | ||||||
|  |     //   - validate admin Authorization header
 | ||||||
|  |     //   - parse JSON body for target did
 | ||||||
|  |     //   - run purge_did function
 | ||||||
|  | 
 | ||||||
|  |     bail!("not yet implemented") | ||||||
|  | } | ||||||
|  | @ -11,6 +11,7 @@ use hyper_util::rt::TokioIo; | ||||||
| use tokio::net::TcpListener; | use tokio::net::TcpListener; | ||||||
| 
 | 
 | ||||||
| use crate::{ | use crate::{ | ||||||
|  |     admin::handle_purge_did, | ||||||
|     relay::{request_crawl::handle_request_crawl, subscribe::handle_subscription}, |     relay::{request_crawl::handle_request_crawl, subscribe::handle_subscription}, | ||||||
|     RelayServer, |     RelayServer, | ||||||
| }; | }; | ||||||
|  | @ -39,11 +40,12 @@ async fn serve(server: Arc<RelayServer>, req: Request<Incoming>) -> Result<Serve | ||||||
|         (&Method::GET, "/xrpc/com.atproto.sync.subscribeRepos") => { |         (&Method::GET, "/xrpc/com.atproto.sync.subscribeRepos") => { | ||||||
|             handle_subscription(server, req).await |             handle_subscription(server, req).await | ||||||
|         } |         } | ||||||
| 
 |  | ||||||
|         (&Method::POST, "/xrpc/com.atproto.sync.requestCrawl") => { |         (&Method::POST, "/xrpc/com.atproto.sync.requestCrawl") => { | ||||||
|             handle_request_crawl(server, req).await |             handle_request_crawl(server, req).await | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  |         (&Method::POST, "/api/admin/purge-did") => handle_purge_did(server, req).await, | ||||||
|  | 
 | ||||||
|         _ => Ok(Response::builder() |         _ => Ok(Response::builder() | ||||||
|             .status(StatusCode::NOT_FOUND) |             .status(StatusCode::NOT_FOUND) | ||||||
|             .header("Content-Type", "text/plain") |             .header("Content-Type", "text/plain") | ||||||
|  |  | ||||||
|  | @ -1,6 +1,7 @@ | ||||||
| mod app_state; | mod app_state; | ||||||
| pub use app_state::*; | pub use app_state::*; | ||||||
| 
 | 
 | ||||||
|  | pub mod admin; | ||||||
| pub mod http; | pub mod http; | ||||||
| pub mod relay; | pub mod relay; | ||||||
| pub mod repo; | pub mod repo; | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue