implement cursor catchup
This commit is contained in:
		
							parent
							
								
									28ecfd6851
								
							
						
					
					
						commit
						6b721763d1
					
				
					 1 changed files with 25 additions and 5 deletions
				
			
		|  | @ -125,7 +125,7 @@ async fn run_subscription( | ||||||
|     ws: WebSocket<TokioIo<Upgraded>>, |     ws: WebSocket<TokioIo<Upgraded>>, | ||||||
| ) { | ) { | ||||||
|     let query = req.uri().query().map(QString::from); |     let query = req.uri().query().map(QString::from); | ||||||
|     let cursor: Option<usize> = query |     let cursor: Option<u128> = query | ||||||
|         .as_ref() |         .as_ref() | ||||||
|         .and_then(|q| q.get("cursor")) |         .and_then(|q| q.get("cursor")) | ||||||
|         .and_then(|s| s.parse().ok()); |         .and_then(|s| s.parse().ok()); | ||||||
|  | @ -134,13 +134,33 @@ async fn run_subscription( | ||||||
| 
 | 
 | ||||||
|     tracing::debug!(id = %sub.id, "subscription started"); |     tracing::debug!(id = %sub.id, "subscription started"); | ||||||
| 
 | 
 | ||||||
|     /* if let Some(cursor) = cursor {
 |     if let Some(cursor) = cursor { | ||||||
|         tracing::debug!(id = %sub.id, "subscription filling from cache"); |         tracing::debug!(id = %sub.id, "subscription filling from cache"); | ||||||
| 
 | 
 | ||||||
|         // TODO: cursor catchup (read from server db history)
 |         let cursor_bytes = cursor.to_be_bytes(); | ||||||
|  |         let mut count = 0; | ||||||
|  |         for event in server.db_history.range(cursor_bytes..) { | ||||||
|  |             let (_seq, event) = match event { | ||||||
|  |                 Ok(ev) => ev, | ||||||
|  |                 Err(e) => { | ||||||
|  |                     tracing::warn!("history err: {e:?}"); | ||||||
|  |                     break; | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
| 
 | 
 | ||||||
|         tracing::debug!(id = %sub.id, "subscription live-tailing"); |             if let Err(e) = ws_tx | ||||||
|     } */ |                 .write_frame(Frame::binary(Payload::Borrowed(&event))) | ||||||
|  |                 .await | ||||||
|  |             { | ||||||
|  |                 tracing::warn!("history send err: {e:?}"); | ||||||
|  |                 break; | ||||||
|  |             }; | ||||||
|  | 
 | ||||||
|  |             count += 1; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         tracing::debug!(sent = %count, id = %sub.id, "subscription live-tailing"); | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     // live tailing:
 |     // live tailing:
 | ||||||
|     let mut raw_block_rx = server.raw_block_tx.subscribe(); |     let mut raw_block_rx = server.raw_block_tx.subscribe(); | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue