improvement: deduplicate watchers
This commit is contained in:
		
							parent
							
								
									06b0c9267f
								
							
						
					
					
						commit
						cbee7fe111
					
				
					 1 changed files with 14 additions and 15 deletions
				
			
		|  | @ -4,14 +4,14 @@ use parking_lot::{Mutex, MutexGuard, RwLock}; | ||||||
| use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; | use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; | ||||||
| use std::{ | use std::{ | ||||||
|     cell::RefCell, |     cell::RefCell, | ||||||
|     collections::HashMap, |     collections::{hash_map, HashMap}, | ||||||
|     future::Future, |     future::Future, | ||||||
|     path::{Path, PathBuf}, |     path::{Path, PathBuf}, | ||||||
|     pin::Pin, |     pin::Pin, | ||||||
|     sync::Arc, |     sync::Arc, | ||||||
| }; | }; | ||||||
| use thread_local::ThreadLocal; | use thread_local::ThreadLocal; | ||||||
| use tokio::sync::oneshot::Sender; | use tokio::sync::watch; | ||||||
| use tracing::debug; | use tracing::debug; | ||||||
| 
 | 
 | ||||||
| thread_local! { | thread_local! { | ||||||
|  | @ -126,7 +126,7 @@ impl DatabaseEngine for Engine { | ||||||
| pub struct SqliteTable { | pub struct SqliteTable { | ||||||
|     engine: Arc<Engine>, |     engine: Arc<Engine>, | ||||||
|     name: String, |     name: String, | ||||||
|     watchers: RwLock<HashMap<Vec<u8>, Vec<Sender<()>>>>, |     watchers: RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type TupleOfBytes = (Vec<u8>, Vec<u8>); | type TupleOfBytes = (Vec<u8>, Vec<u8>); | ||||||
|  | @ -215,10 +215,8 @@ impl Tree for SqliteTable { | ||||||
|         if !triggered.is_empty() { |         if !triggered.is_empty() { | ||||||
|             let mut watchers = self.watchers.write(); |             let mut watchers = self.watchers.write(); | ||||||
|             for prefix in triggered { |             for prefix in triggered { | ||||||
|                 if let Some(txs) = watchers.remove(prefix) { |                 if let Some(tx) = watchers.remove(prefix) { | ||||||
|                     for tx in txs { |                     let _ = tx.0.send(()); | ||||||
|                         let _ = tx.send(()); |  | ||||||
|                     } |  | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         }; |         }; | ||||||
|  | @ -367,17 +365,18 @@ impl Tree for SqliteTable { | ||||||
| 
 | 
 | ||||||
|     #[tracing::instrument(skip(self, prefix))] |     #[tracing::instrument(skip(self, prefix))] | ||||||
|     fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { |     fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> { | ||||||
|         let (tx, rx) = tokio::sync::oneshot::channel(); |         let mut rx = match self.watchers.write().entry(prefix.to_vec()) { | ||||||
| 
 |             hash_map::Entry::Occupied(o) => o.get().1.clone(), | ||||||
|         self.watchers |             hash_map::Entry::Vacant(v) => { | ||||||
|             .write() |                 let (tx, rx) = tokio::sync::watch::channel(()); | ||||||
|             .entry(prefix.to_vec()) |                 v.insert((tx, rx.clone())); | ||||||
|             .or_default() |                 rx | ||||||
|             .push(tx); |             } | ||||||
|  |         }; | ||||||
| 
 | 
 | ||||||
|         Box::pin(async move { |         Box::pin(async move { | ||||||
|             // Tx is never destroyed
 |             // Tx is never destroyed
 | ||||||
|             rx.await.unwrap(); |             rx.changed().await.unwrap(); | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue