Compare commits
	
		
			No commits in common. "e9f9e35c9d4283c4f6d766660bf415ccd7978eae" and "b7279d27afa0d270368e4066d2e397b51d6057e4" have entirely different histories.
		
	
	
		
			e9f9e35c9d
			...
			b7279d27af
		
	
		
					 3 changed files with 6 additions and 39 deletions
				
			
		|  | @ -321,27 +321,6 @@ impl DataServerSubscription { | ||||||
| 
 | 
 | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 |  | ||||||
|     fn load_cursor(&mut self) -> Result<()> { |  | ||||||
|         if let Some(saved_cursor) = self.server.db_index_cursors.get(&self.host)? { |  | ||||||
|             let mut cur_buf = [0u8; 8]; |  | ||||||
|             let len = 8.min(saved_cursor.len()); |  | ||||||
|             cur_buf[..len].copy_from_slice(&saved_cursor[..len]); |  | ||||||
|             self.last_seq = Some(i64::from_be_bytes(cur_buf)); |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         Ok(()) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     fn save_cursor(&self) -> Result<()> { |  | ||||||
|         if let Some(cur) = self.last_seq { |  | ||||||
|             self.server |  | ||||||
|                 .db_index_cursors |  | ||||||
|                 .insert(&self.host, &i64::to_be_bytes(cur))?; |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         Ok(()) |  | ||||||
|     } |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn get_repo_count(host: &str) -> Result<usize> { | async fn get_repo_count(host: &str) -> Result<usize> { | ||||||
|  | @ -391,7 +370,7 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()> | ||||||
|     let _ = server.add_good_host(host.clone()).await; |     let _ = server.add_good_host(host.clone()).await; | ||||||
|     let mut subscription = DataServerSubscription::new(server, host); |     let mut subscription = DataServerSubscription::new(server, host); | ||||||
| 
 | 
 | ||||||
|     subscription.load_cursor()?; |     // TODO: load seq from db ?
 | ||||||
| 
 | 
 | ||||||
|     'reconnect: loop { |     'reconnect: loop { | ||||||
|         let mut ws = create_ws_client( |         let mut ws = create_ws_client( | ||||||
|  | @ -421,8 +400,6 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()> | ||||||
|                     if let Err(e) = subscription.handle_event(bytes).await { |                     if let Err(e) = subscription.handle_event(bytes).await { | ||||||
|                         tracing::error!("error handling event (skipping): {e:?}"); |                         tracing::error!("error handling event (skipping): {e:?}"); | ||||||
|                     } |                     } | ||||||
| 
 |  | ||||||
|                     let _ = subscription.save_cursor(); |  | ||||||
|                 } |                 } | ||||||
|                 Ok(frame) if frame.opcode == OpCode::Close => { |                 Ok(frame) if frame.opcode == OpCode::Close => { | ||||||
|                     tracing::debug!("got close frame. reconnecting in 10s"); |                     tracing::debug!("got close frame. reconnecting in 10s"); | ||||||
|  | @ -455,7 +432,7 @@ pub async fn index_server(server: Arc<RelayServer>, host: String) -> Result<()> | ||||||
|     { |     { | ||||||
|         let mut active_indexers = server.active_indexers.lock().await; |         let mut active_indexers = server.active_indexers.lock().await; | ||||||
|         if active_indexers.contains(&host) { |         if active_indexers.contains(&host) { | ||||||
|             bail!("Indexer already running for host"); |             bail!("Indexer already running for host {}", &host); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         active_indexers.insert(host.clone()); |         active_indexers.insert(host.clone()); | ||||||
|  |  | ||||||
|  | @ -9,7 +9,6 @@ pub struct RelayServer { | ||||||
|     pub db: sled::Db, |     pub db: sled::Db, | ||||||
|     pub db_history: sled::Tree, |     pub db_history: sled::Tree, | ||||||
|     pub db_users: sled::Tree, |     pub db_users: sled::Tree, | ||||||
|     pub db_index_cursors: sled::Tree, |  | ||||||
| 
 | 
 | ||||||
|     pub known_good_hosts: Mutex<BTreeSet<String>>, |     pub known_good_hosts: Mutex<BTreeSet<String>>, | ||||||
|     pub active_indexers: Mutex<BTreeSet<String>>, |     pub active_indexers: Mutex<BTreeSet<String>>, | ||||||
|  | @ -39,9 +38,6 @@ impl RelayServer { | ||||||
|                 .open_tree("history") |                 .open_tree("history") | ||||||
|                 .expect("failed to open history tree"), |                 .expect("failed to open history tree"), | ||||||
|             db_users: db.open_tree("users").expect("failed to open users tree"), |             db_users: db.open_tree("users").expect("failed to open users tree"), | ||||||
|             db_index_cursors: db |  | ||||||
|                 .open_tree("index_cursors") |  | ||||||
|                 .expect("failed to ope index_cursors tree"), |  | ||||||
|             db, |             db, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -21,11 +21,8 @@ pub async fn handle_request_crawl( | ||||||
|         Ok(input) => input, |         Ok(input) => input, | ||||||
|         Err(_) => { |         Err(_) => { | ||||||
|             // TODO: surely we can build out an XRPC abstraction or something
 |             // TODO: surely we can build out an XRPC abstraction or something
 | ||||||
|             return Ok(Response::builder() |             return Ok(Response::builder().status(400).body(body_full( | ||||||
|                 .status(400) |                 r#"{ "error": "InvalidRequest", "message": "Failed to parse request body" }"#, | ||||||
|                 .header("Content-Type", "application/json") |  | ||||||
|                 .body(body_full( |  | ||||||
|                     r#"{"error":"InvalidRequest","message":"Failed to parse request body"}"#, |  | ||||||
|             ))?); |             ))?); | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
|  | @ -37,8 +34,5 @@ pub async fn handle_request_crawl( | ||||||
|         } |         } | ||||||
|     }); |     }); | ||||||
| 
 | 
 | ||||||
|     Ok(Response::builder() |     Ok(Response::builder().status(200).body(body_empty())?) | ||||||
|         .status(200) |  | ||||||
|         .header("Content-Type", "application/json") |  | ||||||
|         .body(body_full(r#"{"status":"ok"}"#))?) |  | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue