From 41450ec342cde1cb57d3ca46ede9050132e66007 Mon Sep 17 00:00:00 2001 From: Charlotte Som Date: Wed, 27 Nov 2024 08:24:00 +0200 Subject: [PATCH] load and save indexer cursor --- src/indexer.rs | 25 ++++++++++++++++++++++++- src/lib.rs | 4 ++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/indexer.rs b/src/indexer.rs index 2adca0c..793c582 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -321,6 +321,27 @@ impl DataServerSubscription { Ok(()) } + + fn load_cursor(&mut self) -> Result<()> { + if let Some(saved_cursor) = self.server.db_index_cursors.get(&self.host)? { + let mut cur_buf = [0u8; 8]; + let len = 8.min(saved_cursor.len()); + cur_buf[..len].copy_from_slice(&saved_cursor[..len]); + self.last_seq = Some(i64::from_be_bytes(cur_buf)); + } + + Ok(()) + } + + fn save_cursor(&self) -> Result<()> { + if let Some(cur) = self.last_seq { + self.server + .db_index_cursors + .insert(&self.host, &i64::to_be_bytes(cur))?; + } + + Ok(()) + } } async fn get_repo_count(host: &str) -> Result { @@ -370,7 +391,7 @@ async fn host_subscription(server: Arc, host: String) -> Result<()> let _ = server.add_good_host(host.clone()).await; let mut subscription = DataServerSubscription::new(server, host); - // TODO: load seq from db ? + subscription.load_cursor()?; 'reconnect: loop { let mut ws = create_ws_client( @@ -400,6 +421,8 @@ async fn host_subscription(server: Arc, host: String) -> Result<()> if let Err(e) = subscription.handle_event(bytes).await { tracing::error!("error handling event (skipping): {e:?}"); } + + let _ = subscription.save_cursor(); } Ok(frame) if frame.opcode == OpCode::Close => { tracing::debug!("got close frame. reconnecting in 10s"); diff --git a/src/lib.rs b/src/lib.rs index c540530..e3a0706 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub struct RelayServer { pub db: sled::Db, pub db_history: sled::Tree, pub db_users: sled::Tree, + pub db_index_cursors: sled::Tree, pub known_good_hosts: Mutex>, pub active_indexers: Mutex>, @@ -38,6 +39,9 @@ impl RelayServer { .open_tree("history") .expect("failed to open history tree"), db_users: db.open_tree("users").expect("failed to open users tree"), + db_index_cursors: db + .open_tree("index_cursors") + .expect("failed to ope index_cursors tree"), db, } }