Compare commits

..

No commits in common. "e9f9e35c9d4283c4f6d766660bf415ccd7978eae" and "b7279d27afa0d270368e4066d2e397b51d6057e4" have entirely different histories.

3 changed files with 6 additions and 39 deletions

View file

@ -321,27 +321,6 @@ impl DataServerSubscription {
Ok(())
}
fn load_cursor(&mut self) -> Result<()> {
if let Some(saved_cursor) = self.server.db_index_cursors.get(&self.host)? {
let mut cur_buf = [0u8; 8];
let len = 8.min(saved_cursor.len());
cur_buf[..len].copy_from_slice(&saved_cursor[..len]);
self.last_seq = Some(i64::from_be_bytes(cur_buf));
}
Ok(())
}
fn save_cursor(&self) -> Result<()> {
if let Some(cur) = self.last_seq {
self.server
.db_index_cursors
.insert(&self.host, &i64::to_be_bytes(cur))?;
}
Ok(())
}
}
async fn get_repo_count(host: &str) -> Result<usize> {
@ -391,7 +370,7 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()>
let _ = server.add_good_host(host.clone()).await;
let mut subscription = DataServerSubscription::new(server, host);
subscription.load_cursor()?;
// TODO: load seq from db ?
'reconnect: loop {
let mut ws = create_ws_client(
@ -421,8 +400,6 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()>
if let Err(e) = subscription.handle_event(bytes).await {
tracing::error!("error handling event (skipping): {e:?}");
}
let _ = subscription.save_cursor();
}
Ok(frame) if frame.opcode == OpCode::Close => {
tracing::debug!("got close frame. reconnecting in 10s");
@ -455,7 +432,7 @@ pub async fn index_server(server: Arc<RelayServer>, host: String) -> Result<()>
{
let mut active_indexers = server.active_indexers.lock().await;
if active_indexers.contains(&host) {
bail!("Indexer already running for host");
bail!("Indexer already running for host {}", &host);
}
active_indexers.insert(host.clone());

View file

@ -9,7 +9,6 @@ pub struct RelayServer {
pub db: sled::Db,
pub db_history: sled::Tree,
pub db_users: sled::Tree,
pub db_index_cursors: sled::Tree,
pub known_good_hosts: Mutex<BTreeSet<String>>,
pub active_indexers: Mutex<BTreeSet<String>>,
@ -39,9 +38,6 @@ impl RelayServer {
.open_tree("history")
.expect("failed to open history tree"),
db_users: db.open_tree("users").expect("failed to open users tree"),
db_index_cursors: db
.open_tree("index_cursors")
.expect("failed to ope index_cursors tree"),
db,
}
}

View file

@ -21,11 +21,8 @@ pub async fn handle_request_crawl(
Ok(input) => input,
Err(_) => {
// TODO: surely we can build out an XRPC abstraction or something
return Ok(Response::builder()
.status(400)
.header("Content-Type", "application/json")
.body(body_full(
r#"{"error":"InvalidRequest","message":"Failed to parse request body"}"#,
return Ok(Response::builder().status(400).body(body_full(
r#"{ "error": "InvalidRequest", "message": "Failed to parse request body" }"#,
))?);
}
};
@ -37,8 +34,5 @@ pub async fn handle_request_crawl(
}
});
Ok(Response::builder()
.status(200)
.header("Content-Type", "application/json")
.body(body_full(r#"{"status":"ok"}"#))?)
Ok(Response::builder().status(200).body(body_empty())?)
}