add known-good hosts to db entry
This commit is contained in:
		
							parent
							
								
									a3699c4377
								
							
						
					
					
						commit
						4376d4a2e0
					
				
					 3 changed files with 34 additions and 9 deletions
				
			
		|  | @ -367,6 +367,7 @@ async fn host_subscription(server: Arc<RelayServer>, host: String) -> Result<()> | ||||||
|         bail!("too many repos! ditching from cerulea relay") |         bail!("too many repos! ditching from cerulea relay") | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     let _ = server.add_good_host(host.clone()).await; | ||||||
|     let mut subscription = DataServerSubscription::new(server, host); |     let mut subscription = DataServerSubscription::new(server, host); | ||||||
| 
 | 
 | ||||||
|     // TODO: load seq from db ?
 |     // TODO: load seq from db ?
 | ||||||
|  |  | ||||||
							
								
								
									
										28
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								src/lib.rs
									
									
									
									
									
								
							|  | @ -1,5 +1,6 @@ | ||||||
| use std::collections::HashSet; | use std::collections::{BTreeSet, HashSet}; | ||||||
| 
 | 
 | ||||||
|  | use anyhow::Result; | ||||||
| use bytes::Bytes; | use bytes::Bytes; | ||||||
| use tokio::sync::{broadcast, mpsc, Mutex}; | use tokio::sync::{broadcast, mpsc, Mutex}; | ||||||
| use wire_proto::StreamEvent; | use wire_proto::StreamEvent; | ||||||
|  | @ -9,6 +10,7 @@ pub struct RelayServer { | ||||||
|     pub db_history: sled::Tree, |     pub db_history: sled::Tree, | ||||||
|     pub db_users: sled::Tree, |     pub db_users: sled::Tree, | ||||||
| 
 | 
 | ||||||
|  |     pub known_good_hosts: Mutex<BTreeSet<String>>, | ||||||
|     pub active_indexers: Mutex<HashSet<String>>, |     pub active_indexers: Mutex<HashSet<String>>, | ||||||
| 
 | 
 | ||||||
|     pub event_tx: mpsc::Sender<StreamEvent>, |     pub event_tx: mpsc::Sender<StreamEvent>, | ||||||
|  | @ -18,10 +20,18 @@ pub struct RelayServer { | ||||||
| impl RelayServer { | impl RelayServer { | ||||||
|     pub fn new(db: sled::Db, event_tx: mpsc::Sender<StreamEvent>) -> Self { |     pub fn new(db: sled::Db, event_tx: mpsc::Sender<StreamEvent>) -> Self { | ||||||
|         let (raw_block_tx, _) = broadcast::channel(128); |         let (raw_block_tx, _) = broadcast::channel(128); | ||||||
|  | 
 | ||||||
|  |         let hosts = db | ||||||
|  |             .get("hosts") | ||||||
|  |             .expect("Failed to read db entry for hosts") | ||||||
|  |             .and_then(|v| serde_ipld_dagcbor::from_slice::<Vec<String>>(&v).ok()) | ||||||
|  |             .unwrap_or_default(); | ||||||
|  | 
 | ||||||
|         Self { |         Self { | ||||||
|             event_tx, |             event_tx, | ||||||
|             raw_block_tx, |             raw_block_tx, | ||||||
| 
 | 
 | ||||||
|  |             known_good_hosts: Mutex::new(hosts.into_iter().collect()), | ||||||
|             active_indexers: Default::default(), |             active_indexers: Default::default(), | ||||||
| 
 | 
 | ||||||
|             db_history: db |             db_history: db | ||||||
|  | @ -31,6 +41,22 @@ impl RelayServer { | ||||||
|             db, |             db, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn add_good_host(&self, host: String) -> Result<()> { | ||||||
|  |         let mut hosts = self.known_good_hosts.lock().await; | ||||||
|  |         if hosts.contains(&host) { | ||||||
|  |             return Ok(()); | ||||||
|  |         } | ||||||
|  |         tracing::debug!(%host, "discovered new known-good host"); | ||||||
|  | 
 | ||||||
|  |         hosts.insert(host); | ||||||
|  |         let hosts_copy = hosts.iter().collect::<Vec<_>>(); | ||||||
|  |         let serialized_hosts = serde_ipld_dagcbor::to_vec(&hosts_copy)?; | ||||||
|  |         drop(hosts); | ||||||
|  | 
 | ||||||
|  |         self.db.insert("hosts", serialized_hosts)?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub mod http; | pub mod http; | ||||||
|  |  | ||||||
							
								
								
									
										14
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								src/main.rs
									
									
									
									
									
								
							|  | @ -28,15 +28,13 @@ async fn main() -> Result<()> { | ||||||
|     let (event_tx, event_rx) = mpsc::channel(128); |     let (event_tx, event_rx) = mpsc::channel(128); | ||||||
| 
 | 
 | ||||||
|     let server = Arc::new(RelayServer::new(db, event_tx)); |     let server = Arc::new(RelayServer::new(db, event_tx)); | ||||||
|  |     let initial_hosts: Vec<String> = { | ||||||
|  |         let hosts = server.known_good_hosts.lock().await; | ||||||
|  |         hosts.iter().cloned().collect() | ||||||
|  |     }; | ||||||
|  |     tracing::debug!(hosts = ?initial_hosts, "got initial hosts"); | ||||||
| 
 | 
 | ||||||
|     let hosts = server |     index_servers(Arc::clone(&server), &initial_hosts); | ||||||
|         .db |  | ||||||
|         .get("hosts")? |  | ||||||
|         .and_then(|v| serde_ipld_dagcbor::from_slice::<Vec<String>>(&v).ok()) |  | ||||||
|         .unwrap_or_default(); |  | ||||||
|     tracing::debug!(?hosts, "got list of hosts"); |  | ||||||
| 
 |  | ||||||
|     index_servers(Arc::clone(&server), &hosts); |  | ||||||
|     start_sequencer(Arc::clone(&server), event_rx); |     start_sequencer(Arc::clone(&server), event_rx); | ||||||
| 
 | 
 | ||||||
|     // TODO: configurable bind address
 |     // TODO: configurable bind address
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue