diff --git a/Cargo.lock b/Cargo.lock index 417e437..347f235 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,6 +135,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.59.1" @@ -234,6 +243,12 @@ dependencies = [ "nom", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -278,6 +293,7 @@ dependencies = [ "bytes", "crossbeam", "directories", + "heed", "http", "image", "jsonwebtoken", @@ -379,7 +395,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -388,12 +404,12 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-channel", "crossbeam-deque", "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", + "crossbeam-queue 0.3.2", + "crossbeam-utils 0.8.5", ] [[package]] @@ -402,8 +418,8 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" dependencies = [ - "cfg-if", - "crossbeam-utils", + "cfg-if 1.0.0", + "crossbeam-utils 0.8.5", ] [[package]] @@ -412,9 +428,9 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.8.5", ] [[package]] @@ -423,21 +439,40 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ - "cfg-if", - "crossbeam-utils", + "cfg-if 1.0.0", + "crossbeam-utils 0.8.5", "lazy_static", "memoffset", "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" +dependencies = [ + "crossbeam-utils 0.6.6", +] + [[package]] name = "crossbeam-queue" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" dependencies = [ - "cfg-if", - "crossbeam-utils", + "cfg-if 1.0.0", + "crossbeam-utils 0.8.5", +] + +[[package]] +name = "crossbeam-utils" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" +dependencies = [ + "cfg-if 0.1.10", + "lazy_static", ] [[package]] @@ -446,7 +481,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "lazy_static", ] @@ -610,7 +645,7 @@ version = "0.8.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -828,7 +863,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi 0.9.0+wasi-snapshot-preview1", ] @@ -839,7 +874,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi 0.10.2+wasi-snapshot-preview1", ] @@ -906,6 +941,42 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heed" +version = "0.10.6" +source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" +dependencies = [ + "bytemuck", + "byteorder", + "heed-traits", + "heed-types", + "libc", + "lmdb-rkv-sys", + "once_cell", + "page_size", + "serde", + "synchronoise", + "url", +] + +[[package]] +name = "heed-traits" +version = "0.7.0" +source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" + +[[package]] +name = "heed-types" +version = "0.7.2" +source = "git+https://github.com/Kerollmops/heed.git?rev=b235e9c3e9984737c967b5de1014b48f125dc28b#b235e9c3e9984737c967b5de1014b48f125dc28b" +dependencies = [ + "bincode", + "bytemuck", + "byteorder", + "heed-traits", + "serde", + "serde_json", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1068,7 +1139,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -1190,7 +1261,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "winapi", ] @@ -1223,6 +1294,17 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "lmdb-rkv-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b27470ac25167b3afdfb6af8fcd3bc1be67de50ffbdaf4073378cfded6ae24a5" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "lock_api" version = "0.4.4" @@ -1238,7 +1320,7 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -1247,7 +1329,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2111607c723d7857e0d8299d5ce7a0bf4b844d3e44f8de136b13da513eaf8fc4" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "generator", "scoped-tls", "serde", @@ -1503,6 +1585,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "page_size" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parking_lot" version = "0.11.1" @@ -1520,7 +1612,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "instant", "libc", "redox_syscall", @@ -2320,7 +2412,7 @@ dependencies = [ "base64 0.13.0", "blake2b_simd", "constant_time_eq", - "crossbeam-utils", + "crossbeam-utils 0.8.5", ] [[package]] @@ -2540,7 +2632,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b362ae5752fd2137731f9fa25fd4d9058af34666ca1966fb969119cc35719f12" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", "opaque-debug", @@ -2601,7 +2693,7 @@ checksum = "1d0132f3e393bcb7390c60bb45769498cf4550bcb7a21d7f95c02b69f6362cdc" dependencies = [ "crc32fast", "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.8.5", "fs2", "fxhash", "libc", @@ -2622,7 +2714,7 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "winapi", ] @@ -2751,6 +2843,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "synchronoise" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d717ed0efc9d39ab3b642a096bc369a3e02a38a51c41845d7fe31bdad1d6eaeb" +dependencies = [ + "crossbeam-queue 0.1.2", +] + [[package]] name = "synstructure" version = "0.12.5" @@ -2775,7 +2876,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "rand 0.8.4", "redox_syscall", @@ -3005,7 +3106,7 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3105,7 +3206,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0d7f5db438199a6e2609debe3f69f808d074e0a2888ee0bccb45fe234d03f4" dependencies = [ "async-trait", - "cfg-if", + "cfg-if 1.0.0", "data-encoding", "enum-as-inner", "futures-channel", @@ -3129,7 +3230,7 @@ version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ad17b608a64bd0735e67bde16b0636f8aa8591f831a25d18443ed00a699770" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "futures-util", "ipconfig", "lazy_static", @@ -3290,7 +3391,7 @@ version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "serde", "serde_json", "wasm-bindgen-macro", @@ -3317,7 +3418,7 @@ version = "0.4.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fba7978c679d53ce2d0ac80c8c175840feb849a161664365d1287b41f2e67f1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "wasm-bindgen", "web-sys", diff --git a/Cargo.toml b/Cargo.toml index 92134a4..1d774dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,12 +79,14 @@ parking_lot = { version = "0.11.1", optional = true } crossbeam = { version = "0.8.1", optional = true } num_cpus = "1.13.0" threadpool = "1.8.1" +heed = { git = "https://github.com/Kerollmops/heed.git", rev = "b235e9c3e9984737c967b5de1014b48f125dc28b", optional = true } [features] -default = ["conduit_bin", "backend_sqlite"] +default = ["conduit_bin", "backend_heed"] backend_sled = ["sled"] backend_rocksdb = ["rocksdb"] backend_sqlite = ["sqlite"] +backend_heed = ["heed", "crossbeam"] sqlite = ["rusqlite", "parking_lot", "crossbeam", "tokio/signal"] conduit_bin = [] # TODO: add rocket to this when it is optional diff --git a/src/client_server/to_device.rs b/src/client_server/to_device.rs index e3fd780..d3f7d25 100644 --- a/src/client_server/to_device.rs +++ b/src/client_server/to_device.rs @@ -47,7 +47,7 @@ pub async fn send_event_to_device_route( db.sending.send_reliable_edu( target_user_id.server_name(), - &serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice( + serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice( DirectDeviceContent { sender: sender_user.clone(), ev_type: EventType::from(&body.event_type), diff --git a/src/database.rs b/src/database.rs index 9f24345..65a60f0 100644 --- a/src/database.rs +++ b/src/database.rs @@ -156,6 +156,9 @@ pub type Engine = abstraction::rocksdb::Engine; #[cfg(feature = "sqlite")] pub type Engine = abstraction::sqlite::Engine; +#[cfg(feature = "heed")] +pub type Engine = abstraction::heed::Engine; + pub struct Database { _db: Arc, pub globals: globals::Globals, @@ -188,24 +191,20 @@ impl Database { fn check_sled_or_sqlite_db(config: &Config) -> Result<()> { let path = Path::new(&config.database_path); - #[cfg(feature = "backend_sqlite")] - { - let sled_exists = path.join("db").exists(); - let sqlite_exists = path.join("conduit.db").exists(); - if sled_exists { - if sqlite_exists { - // most likely an in-place directory, only warn - warn!("Both sled and sqlite databases are detected in database directory"); - warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") - } else { - error!( - "Sled database detected, conduit now uses sqlite for database operations" - ); - error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); - return Err(Error::bad_config( - "sled database detected, migrate to sqlite", - )); - } + let sled_exists = path.join("db").exists(); + let sqlite_exists = path.join("conduit.db").exists(); + // TODO: heed + if sled_exists { + if sqlite_exists { + // most likely an in-place directory, only warn + warn!("Both sled and sqlite databases are detected in database directory"); + warn!("Currently running from the sqlite database, but consider removing sled database files to free up space") + } else { + error!("Sled database detected, conduit now uses sqlite for database operations"); + error!("This database must be converted to sqlite, go to https://github.com/ShadowJonathan/conduit_toolbox#conduit_sled_to_sqlite"); + return Err(Error::bad_config( + "sled database detected, migrate to sqlite", + )); } } @@ -313,8 +312,8 @@ impl Database { }, sending: sending::Sending { servername_educount: builder.open_tree("servername_educount")?, - servernamepduids: builder.open_tree("servernamepduids")?, - servercurrentevents: builder.open_tree("servercurrentevents")?, + servernameevent_data: builder.open_tree("servernameevent_data")?, + servercurrentevent_data: builder.open_tree("servercurrentevent_data")?, maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), sender: sending_sender, }, diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index fb11ba0..8ccac78 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -12,6 +12,9 @@ pub mod sled; #[cfg(feature = "sqlite")] pub mod sqlite; +#[cfg(feature = "heed")] +pub mod heed; + pub trait DatabaseEngine: Sized { fn open(config: &Config) -> Result>; fn open_tree(self: &Arc, name: &'static str) -> Result>; diff --git a/src/database/abstraction/heed.rs b/src/database/abstraction/heed.rs new file mode 100644 index 0000000..61e7927 --- /dev/null +++ b/src/database/abstraction/heed.rs @@ -0,0 +1,241 @@ +use super::super::Config; +use crossbeam::channel::{bounded, Sender as ChannelSender}; +use threadpool::ThreadPool; + +use crate::{Error, Result}; +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::{Arc, Mutex, RwLock}, +}; +use tokio::sync::oneshot::Sender; + +use super::{DatabaseEngine, Tree}; + +type TupleOfBytes = (Vec, Vec); + +pub struct Engine { + env: heed::Env, + iter_pool: Mutex, +} + +pub struct EngineTree { + engine: Arc, + tree: Arc, + watchers: RwLock, Vec>>>, +} + +fn convert_error(error: heed::Error) -> Error { + panic!(error.to_string()); + Error::HeedError { + error: error.to_string(), + } +} + +impl DatabaseEngine for Engine { + fn open(config: &Config) -> Result> { + let mut env_builder = heed::EnvOpenOptions::new(); + env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte + env_builder.max_readers(126); + env_builder.max_dbs(128); + unsafe { + env_builder.flag(heed::flags::Flags::MdbNoSync); + env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + } + + Ok(Arc::new(Engine { + env: env_builder + .open(&config.database_path) + .map_err(convert_error)?, + iter_pool: Mutex::new(ThreadPool::new(10)), + })) + } + + fn open_tree(self: &Arc, name: &'static str) -> Result> { + // Creates the db if it doesn't exist already + Ok(Arc::new(EngineTree { + engine: Arc::clone(self), + tree: Arc::new( + self.env + .create_database(Some(name)) + .map_err(convert_error)?, + ), + watchers: RwLock::new(HashMap::new()), + })) + } + + fn flush(self: &Arc) -> Result<()> { + self.env.force_sync().map_err(convert_error)?; + Ok(()) + } +} + +impl EngineTree { + #[tracing::instrument(skip(self, tree, from, backwards))] + fn iter_from_thread( + &self, + tree: Arc, + from: Vec, + backwards: bool, + ) -> Box + Send + Sync> { + let (s, r) = bounded::(5); + let engine = Arc::clone(&self.engine); + + let lock = self.engine.iter_pool.lock().unwrap(); + if lock.active_count() < lock.max_count() { + lock.execute(move || { + iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); + }); + } else { + std::thread::spawn(move || { + iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s); + }); + } + + Box::new(r.into_iter()) + } +} + +#[tracing::instrument(skip(tree, txn, from, backwards))] +fn iter_from_thread_work( + tree: Arc, + txn: &heed::RoTxn<'_>, + from: Vec, + backwards: bool, + s: &ChannelSender<(Vec, Vec)>, +) { + if backwards { + for (k, v) in tree.rev_range(txn, ..=&*from).unwrap().map(|r| r.unwrap()) { + if s.send((k.to_vec(), v.to_vec())).is_err() { + return; + } + } + } else { + if from.is_empty() { + for (k, v) in tree.iter(txn).unwrap().map(|r| r.unwrap()) { + if s.send((k.to_vec(), v.to_vec())).is_err() { + return; + } + } + } else { + for (k, v) in tree.range(txn, &*from..).unwrap().map(|r| r.unwrap()) { + if s.send((k.to_vec(), v.to_vec())).is_err() { + return; + } + } + } + } +} + +impl Tree for EngineTree { + #[tracing::instrument(skip(self, key))] + fn get(&self, key: &[u8]) -> Result>> { + let txn = self.engine.env.read_txn().map_err(convert_error)?; + Ok(self + .tree + .get(&txn, &key) + .map_err(convert_error)? + .map(|s| s.to_vec())) + } + + #[tracing::instrument(skip(self, key, value))] + fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + self.tree + .put(&mut txn, &key, &value) + .map_err(convert_error)?; + txn.commit().map_err(convert_error)?; + + let watchers = self.watchers.read().unwrap(); + let mut triggered = Vec::new(); + + for length in 0..=key.len() { + if watchers.contains_key(&key[..length]) { + triggered.push(&key[..length]); + } + } + + drop(watchers); + + if !triggered.is_empty() { + let mut watchers = self.watchers.write().unwrap(); + for prefix in triggered { + if let Some(txs) = watchers.remove(prefix) { + for tx in txs { + let _ = tx.send(()); + } + } + } + }; + + Ok(()) + } + + #[tracing::instrument(skip(self, key))] + fn remove(&self, key: &[u8]) -> Result<()> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + self.tree.delete(&mut txn, &key).map_err(convert_error)?; + txn.commit().map_err(convert_error)?; + Ok(()) + } + + #[tracing::instrument(skip(self))] + fn iter<'a>(&'a self) -> Box, Vec)> + Send + 'a> { + self.iter_from(&[], false) + } + + #[tracing::instrument(skip(self, from, backwards))] + fn iter_from( + &self, + from: &[u8], + backwards: bool, + ) -> Box, Vec)> + Send> { + self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards) + } + + #[tracing::instrument(skip(self, key))] + fn increment(&self, key: &[u8]) -> Result> { + let mut txn = self.engine.env.write_txn().map_err(convert_error)?; + + let old = self.tree.get(&txn, &key).map_err(convert_error)?; + let new = + crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some"); + + self.tree + .put(&mut txn, &key, &&*new) + .map_err(convert_error)?; + + txn.commit().map_err(convert_error)?; + + Ok(new) + } + + #[tracing::instrument(skip(self, prefix))] + fn scan_prefix<'a>( + &'a self, + prefix: Vec, + ) -> Box, Vec)> + Send + 'a> { + Box::new( + self.iter_from(&prefix, false) + .take_while(move |(key, _)| key.starts_with(&prefix)), + ) + } + + #[tracing::instrument(skip(self, prefix))] + fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.watchers + .write() + .unwrap() + .entry(prefix.to_vec()) + .or_default() + .push(tx); + + Box::pin(async move { + // Tx is never destroyed + rx.await.unwrap(); + }) + } +} diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 7c38542..8ada87f 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -863,6 +863,7 @@ impl Rooms { if let Some(body) = pdu.content.get("body").and_then(|b| b.as_str()) { for word in body .split_terminator(|c: char| !c.is_alphanumeric()) + .filter(|word| word.len() <= 50) .map(str::to_lowercase) { let mut key = pdu.room_id.as_bytes().to_vec(); diff --git a/src/database/sending.rs b/src/database/sending.rs index f28e883..506bc17 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -81,10 +81,10 @@ pub enum SendingEventType { pub struct Sending { /// The state for a given state hash. pub(super) servername_educount: Arc, // EduCount: Count of last EDU sync - pub(super) servernamepduids: Arc, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId - pub(super) servercurrentevents: Arc, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / (*)EduEvent + pub(super) servernameevent_data: Arc, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / * (for edus), Data = EDU content + pub(super) servercurrentevent_data: Arc, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / * (for edus), Data = EDU content pub(super) maximum_requests: Arc, - pub sender: mpsc::UnboundedSender>, + pub sender: mpsc::UnboundedSender<(Vec, Vec)>, } enum TransactionStatus { @@ -97,7 +97,7 @@ impl Sending { pub fn start_handler( &self, db: Arc>, - mut receiver: mpsc::UnboundedReceiver>, + mut receiver: mpsc::UnboundedReceiver<(Vec, Vec)>, ) { tokio::spawn(async move { let mut futures = FuturesUnordered::new(); @@ -109,16 +109,15 @@ impl Sending { let guard = db.read().await; - for (key, outgoing_kind, event) in - guard - .sending - .servercurrentevents - .iter() - .filter_map(|(key, _)| { - Self::parse_servercurrentevent(&key) - .ok() - .map(|(k, e)| (key, k, e)) - }) + for (key, outgoing_kind, event) in guard + .sending + .servercurrentevent_data + .iter() + .filter_map(|(key, v)| { + Self::parse_servercurrentevent(&key, v) + .ok() + .map(|(k, e)| (key, k, e)) + }) { let entry = initial_transactions .entry(outgoing_kind.clone()) @@ -129,7 +128,7 @@ impl Sending { "Dropping some current events: {:?} {:?} {:?}", key, outgoing_kind, event ); - guard.sending.servercurrentevents.remove(&key).unwrap(); + guard.sending.servercurrentevent_data.remove(&key).unwrap(); continue; } @@ -156,17 +155,17 @@ impl Sending { let guard = db.read().await; let prefix = outgoing_kind.get_prefix(); - for (key, _) in guard.sending.servercurrentevents + for (key, _) in guard.sending.servercurrentevent_data .scan_prefix(prefix.clone()) { - guard.sending.servercurrentevents.remove(&key).unwrap(); + guard.sending.servercurrentevent_data.remove(&key).unwrap(); } // Find events that have been added since starting the last request - let new_events = guard.sending.servernamepduids + let new_events = guard.sending.servernameevent_data .scan_prefix(prefix.clone()) - .filter_map(|(k, _)| { - Self::parse_servercurrentevent(&k).ok().map(|ev| (ev, k)) + .filter_map(|(k, v)| { + Self::parse_servercurrentevent(&k, v).ok().map(|ev| (ev, k)) }) .take(30) .collect::>(); @@ -175,9 +174,10 @@ impl Sending { if !new_events.is_empty() { // Insert pdus we found - for (_, key) in &new_events { - guard.sending.servercurrentevents.insert(&key, &[]).unwrap(); - guard.sending.servernamepduids.remove(&key).unwrap(); + for (e, key) in &new_events { + let value = if let SendingEventType::Edu(value) = &e.1 { &**value } else { &[] }; + guard.sending.servercurrentevent_data.insert(&key, value).unwrap(); + guard.sending.servernameevent_data.remove(&key).unwrap(); } drop(guard); @@ -205,8 +205,8 @@ impl Sending { } }; }, - Some(key) = receiver.next() => { - if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key) { + Some((key, value)) = receiver.next() => { + if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key, value) { let guard = db.read().await; if let Ok(Some(events)) = Self::select_events( @@ -267,18 +267,25 @@ impl Sending { if retry { // We retry the previous transaction - for (key, _) in db.sending.servercurrentevents.scan_prefix(prefix) { - if let Ok((_, e)) = Self::parse_servercurrentevent(&key) { + for (key, value) in db.sending.servercurrentevent_data.scan_prefix(prefix) { + if let Ok((_, e)) = Self::parse_servercurrentevent(&key, value) { events.push(e); } } } else { for (e, full_key) in new_events { - db.sending.servercurrentevents.insert(&full_key, &[])?; + let value = if let SendingEventType::Edu(value) = &e { + &**value + } else { + &[][..] + }; + db.sending + .servercurrentevent_data + .insert(&full_key, value)?; // If it was a PDU we have to unqueue it // TODO: don't try to unqueue EDUs - db.sending.servernamepduids.remove(&full_key)?; + db.sending.servernameevent_data.remove(&full_key)?; events.push(e); } @@ -380,8 +387,8 @@ impl Sending { key.extend_from_slice(&senderkey); key.push(0xff); key.extend_from_slice(pdu_id); - self.servernamepduids.insert(&key, b"")?; - self.sender.unbounded_send(key).unwrap(); + self.servernameevent_data.insert(&key, &[])?; + self.sender.unbounded_send((key, vec![])).unwrap(); Ok(()) } @@ -391,20 +398,19 @@ impl Sending { let mut key = server.as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(pdu_id); - self.servernamepduids.insert(&key, b"")?; - self.sender.unbounded_send(key).unwrap(); + self.servernameevent_data.insert(&key, &[])?; + self.sender.unbounded_send((key, vec![])).unwrap(); Ok(()) } #[tracing::instrument(skip(self, server, serialized))] - pub fn send_reliable_edu(&self, server: &ServerName, serialized: &[u8]) -> Result<()> { + pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec) -> Result<()> { let mut key = server.as_bytes().to_vec(); key.push(0xff); key.push(b'*'); - key.extend_from_slice(serialized); - self.servernamepduids.insert(&key, b"")?; - self.sender.unbounded_send(key).unwrap(); + self.servernameevent_data.insert(&key, &serialized)?; + self.sender.unbounded_send((key, serialized)).unwrap(); Ok(()) } @@ -415,8 +421,8 @@ impl Sending { key.extend_from_slice(appservice_id.as_bytes()); key.push(0xff); key.extend_from_slice(pdu_id); - self.servernamepduids.insert(&key, b"")?; - self.sender.unbounded_send(key).unwrap(); + self.servernameevent_data.insert(&key, &[])?; + self.sender.unbounded_send((key, vec![])).unwrap(); Ok(()) } @@ -451,7 +457,7 @@ impl Sending { ( kind.clone(), Error::bad_database( - "[Appservice] Event in servernamepduids not found in db.", + "[Appservice] Event in servernameevent_data not found in db.", ), ) })? @@ -508,7 +514,7 @@ impl Sending { ( kind.clone(), Error::bad_database( - "[Push] Event in servernamepduids not found in db.", + "[Push] Event in servernamevent_datas not found in db.", ), ) })?, @@ -602,7 +608,7 @@ impl Sending { ( OutgoingKind::Normal(server.clone()), Error::bad_database( - "[Normal] Event in servernamepduids not found in db.", + "[Normal] Event in servernamevent_datas not found in db.", ), ) })?, @@ -662,7 +668,10 @@ impl Sending { } #[tracing::instrument(skip(key))] - fn parse_servercurrentevent(key: &[u8]) -> Result<(OutgoingKind, SendingEventType)> { + fn parse_servercurrentevent( + key: &[u8], + value: Vec, + ) -> Result<(OutgoingKind, SendingEventType)> { // Appservices start with a plus Ok::<_, Error>(if key.starts_with(b"+") { let mut parts = key[1..].splitn(2, |&b| b == 0xff); @@ -680,7 +689,7 @@ impl Sending { Error::bad_database("Invalid server string in server_currenttransaction") })?), if event.starts_with(b"*") { - SendingEventType::Edu(event[1..].to_vec()) + SendingEventType::Edu(value.to_vec()) } else { SendingEventType::Pdu(event.to_vec()) }, diff --git a/src/error.rs b/src/error.rs index eda522a..24e52ec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -42,6 +42,9 @@ pub enum Error { #[from] source: rusqlite::Error, }, + #[cfg(feature = "heed")] + #[error("There was a problem with the connection to the heed database: {error}")] + HeedError { error: String }, #[error("Could not generate an image.")] ImageError { #[from]