diff --git a/src/relay_subscription.rs b/src/relay_subscription.rs index 093d956..d9f0aef 100644 --- a/src/relay_subscription.rs +++ b/src/relay_subscription.rs @@ -125,7 +125,7 @@ async fn run_subscription( ws: WebSocket>, ) { let query = req.uri().query().map(QString::from); - let cursor: Option = query + let cursor: Option = query .as_ref() .and_then(|q| q.get("cursor")) .and_then(|s| s.parse().ok()); @@ -134,13 +134,33 @@ async fn run_subscription( tracing::debug!(id = %sub.id, "subscription started"); - /* if let Some(cursor) = cursor { + if let Some(cursor) = cursor { tracing::debug!(id = %sub.id, "subscription filling from cache"); - // TODO: cursor catchup (read from server db history) + let cursor_bytes = cursor.to_be_bytes(); + let mut count = 0; + for event in server.db_history.range(cursor_bytes..) { + let (_seq, event) = match event { + Ok(ev) => ev, + Err(e) => { + tracing::warn!("history err: {e:?}"); + break; + } + }; - tracing::debug!(id = %sub.id, "subscription live-tailing"); - } */ + if let Err(e) = ws_tx + .write_frame(Frame::binary(Payload::Borrowed(&event))) + .await + { + tracing::warn!("history send err: {e:?}"); + break; + }; + + count += 1; + } + + tracing::debug!(sent = %count, id = %sub.id, "subscription live-tailing"); + } // live tailing: let mut raw_block_rx = server.raw_block_tx.subscribe();