load and save indexer cursor
parent
b7279d27af
commit
41450ec342
|
@ -321,6 +321,27 @@ impl DataServerSubscription {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn load_cursor(&mut self) -> Result<()> {
|
||||||
|
if let Some(saved_cursor) = self.server.db_index_cursors.get(&self.host)? {
|
||||||
|
let mut cur_buf = [0u8; 8];
|
||||||
|
let len = 8.min(saved_cursor.len());
|
||||||
|
cur_buf[..len].copy_from_slice(&saved_cursor[..len]);
|
||||||
|
self.last_seq = Some(i64::from_be_bytes(cur_buf));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn save_cursor(&self) -> Result<()> {
|
||||||
|
if let Some(cur) = self.last_seq {
|
||||||
|
self.server
|
||||||
|
.db_index_cursors
|
||||||
|
.insert(&self.host, &i64::to_be_bytes(cur))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_repo_count(host: &str) -> Result<usize> {
|
async fn get_repo_count(host: &str) -> Result<usize> {
|
||||||
|
@ -370,7 +391,7 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()>
|
||||||
let _ = server.add_good_host(host.clone()).await;
|
let _ = server.add_good_host(host.clone()).await;
|
||||||
let mut subscription = DataServerSubscription::new(server, host);
|
let mut subscription = DataServerSubscription::new(server, host);
|
||||||
|
|
||||||
// TODO: load seq from db ?
|
subscription.load_cursor()?;
|
||||||
|
|
||||||
'reconnect: loop {
|
'reconnect: loop {
|
||||||
let mut ws = create_ws_client(
|
let mut ws = create_ws_client(
|
||||||
|
@ -400,6 +421,8 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()>
|
||||||
if let Err(e) = subscription.handle_event(bytes).await {
|
if let Err(e) = subscription.handle_event(bytes).await {
|
||||||
tracing::error!("error handling event (skipping): {e:?}");
|
tracing::error!("error handling event (skipping): {e:?}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let _ = subscription.save_cursor();
|
||||||
}
|
}
|
||||||
Ok(frame) if frame.opcode == OpCode::Close => {
|
Ok(frame) if frame.opcode == OpCode::Close => {
|
||||||
tracing::debug!("got close frame. reconnecting in 10s");
|
tracing::debug!("got close frame. reconnecting in 10s");
|
||||||
|
|
|
@ -9,6 +9,7 @@ pub struct RelayServer {
|
||||||
pub db: sled::Db,
|
pub db: sled::Db,
|
||||||
pub db_history: sled::Tree,
|
pub db_history: sled::Tree,
|
||||||
pub db_users: sled::Tree,
|
pub db_users: sled::Tree,
|
||||||
|
pub db_index_cursors: sled::Tree,
|
||||||
|
|
||||||
pub known_good_hosts: Mutex<BTreeSet<String>>,
|
pub known_good_hosts: Mutex<BTreeSet<String>>,
|
||||||
pub active_indexers: Mutex<BTreeSet<String>>,
|
pub active_indexers: Mutex<BTreeSet<String>>,
|
||||||
|
@ -38,6 +39,9 @@ impl RelayServer {
|
||||||
.open_tree("history")
|
.open_tree("history")
|
||||||
.expect("failed to open history tree"),
|
.expect("failed to open history tree"),
|
||||||
db_users: db.open_tree("users").expect("failed to open users tree"),
|
db_users: db.open_tree("users").expect("failed to open users tree"),
|
||||||
|
db_index_cursors: db
|
||||||
|
.open_tree("index_cursors")
|
||||||
|
.expect("failed to ope index_cursors tree"),
|
||||||
db,
|
db,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue