diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index d1b7b5d..ffdd34a 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -4,14 +4,14 @@ use parking_lot::{Mutex, MutexGuard, RwLock}; use rusqlite::{Connection, DatabaseName::Main, OptionalExtension}; use std::{ cell::RefCell, - collections::HashMap, + collections::{hash_map, HashMap}, future::Future, path::{Path, PathBuf}, pin::Pin, sync::Arc, }; use thread_local::ThreadLocal; -use tokio::sync::oneshot::Sender; +use tokio::sync::watch; use tracing::debug; thread_local! { @@ -126,7 +126,7 @@ impl DatabaseEngine for Engine { pub struct SqliteTable { engine: Arc, name: String, - watchers: RwLock, Vec>>>, + watchers: RwLock, (watch::Sender<()>, watch::Receiver<()>)>>, } type TupleOfBytes = (Vec, Vec); @@ -215,10 +215,8 @@ impl Tree for SqliteTable { if !triggered.is_empty() { let mut watchers = self.watchers.write(); for prefix in triggered { - if let Some(txs) = watchers.remove(prefix) { - for tx in txs { - let _ = tx.send(()); - } + if let Some(tx) = watchers.remove(prefix) { + let _ = tx.0.send(()); } } }; @@ -367,17 +365,18 @@ impl Tree for SqliteTable { #[tracing::instrument(skip(self, prefix))] fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - self.watchers - .write() - .entry(prefix.to_vec()) - .or_default() - .push(tx); + let mut rx = match self.watchers.write().entry(prefix.to_vec()) { + hash_map::Entry::Occupied(o) => o.get().1.clone(), + hash_map::Entry::Vacant(v) => { + let (tx, rx) = tokio::sync::watch::channel(()); + v.insert((tx, rx.clone())); + rx + } + }; Box::pin(async move { // Tx is never destroyed - rx.await.unwrap(); + rx.changed().await.unwrap(); }) }