Merge branch 'deduplicate-watchers' into 'next'
improvement: deduplicate watchers See merge request famedly/conduit!201
This commit is contained in:
		
						commit
						178451e9b5
					
				
					 1 changed files with 14 additions and 15 deletions
				
			
		|  | @ -4,14 +4,14 @@ use parking_lot::{Mutex, MutexGuard, RwLock}; | |||
| use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; | ||||
| use std::{ | ||||
|     cell::RefCell, | ||||
|     collections::HashMap, | ||||
|     collections::{hash_map, HashMap}, | ||||
|     future::Future, | ||||
|     path::{Path, PathBuf}, | ||||
|     pin::Pin, | ||||
|     sync::Arc, | ||||
| }; | ||||
| use thread_local::ThreadLocal; | ||||
| use tokio::sync::oneshot::Sender; | ||||
| use tokio::sync::watch; | ||||
| use tracing::debug; | ||||
| 
 | ||||
| thread_local! { | ||||
|  | @ -126,7 +126,7 @@ impl DatabaseEngine for Engine { | |||
| pub struct SqliteTable { | ||||
|     engine: Arc<Engine>, | ||||
|     name: String, | ||||
|     watchers: RwLock<HashMap<Vec<u8>, Vec<Sender<()>>>>, | ||||
|     watchers: RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>, | ||||
| } | ||||
| 
 | ||||
| type TupleOfBytes = (Vec<u8>, Vec<u8>); | ||||
|  | @ -215,10 +215,8 @@ impl Tree for SqliteTable { | |||
|         if !triggered.is_empty() { | ||||
|             let mut watchers = self.watchers.write(); | ||||
|             for prefix in triggered { | ||||
|                 if let Some(txs) = watchers.remove(prefix) { | ||||
|                     for tx in txs { | ||||
|                         let _ = tx.send(()); | ||||
|                     } | ||||
|                 if let Some(tx) = watchers.remove(prefix) { | ||||
|                     let _ = tx.0.send(()); | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|  | @ -367,17 +365,18 @@ impl Tree for SqliteTable { | |||
| 
 | ||||
|     #[tracing::instrument(skip(self, prefix))] | ||||
|     fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { | ||||
|         let (tx, rx) = tokio::sync::oneshot::channel(); | ||||
| 
 | ||||
|         self.watchers | ||||
|             .write() | ||||
|             .entry(prefix.to_vec()) | ||||
|             .or_default() | ||||
|             .push(tx); | ||||
|         let mut rx = match self.watchers.write().entry(prefix.to_vec()) { | ||||
|             hash_map::Entry::Occupied(o) => o.get().1.clone(), | ||||
|             hash_map::Entry::Vacant(v) => { | ||||
|                 let (tx, rx) = tokio::sync::watch::channel(()); | ||||
|                 v.insert((tx, rx.clone())); | ||||
|                 rx | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         Box::pin(async move { | ||||
|             // Tx is never destroyed
 | ||||
|             rx.await.unwrap(); | ||||
|             rx.changed().await.unwrap(); | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue