From 6b721763d1c4cf4f6dfcf02156b748a2c6db3084 Mon Sep 17 00:00:00 2001 From: Charlotte Som Date: Tue, 26 Nov 2024 23:40:12 +0200 Subject: [PATCH] implement cursor catchup --- src/relay_subscription.rs | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/relay_subscription.rs b/src/relay_subscription.rs index 093d956..d9f0aef 100644 --- a/src/relay_subscription.rs +++ b/src/relay_subscription.rs @@ -125,7 +125,7 @@ async fn run_subscription( ws: WebSocket>, ) { let query = req.uri().query().map(QString::from); - let cursor: Option = query + let cursor: Option = query .as_ref() .and_then(|q| q.get("cursor")) .and_then(|s| s.parse().ok()); @@ -134,13 +134,33 @@ async fn run_subscription( tracing::debug!(id = %sub.id, "subscription started"); - /* if let Some(cursor) = cursor { + if let Some(cursor) = cursor { tracing::debug!(id = %sub.id, "subscription filling from cache"); - // TODO: cursor catchup (read from server db history) + let cursor_bytes = cursor.to_be_bytes(); + let mut count = 0; + for event in server.db_history.range(cursor_bytes..) { + let (_seq, event) = match event { + Ok(ev) => ev, + Err(e) => { + tracing::warn!("history err: {e:?}"); + break; + } + }; - tracing::debug!(id = %sub.id, "subscription live-tailing"); - } */ + if let Err(e) = ws_tx + .write_frame(Frame::binary(Payload::Borrowed(&event))) + .await + { + tracing::warn!("history send err: {e:?}"); + break; + }; + + count += 1; + } + + tracing::debug!(sent = %count, id = %sub.id, "subscription live-tailing"); + } // live tailing: let mut raw_block_rx = server.raw_block_tx.subscribe();