From 632a1343eb900f05714621ed8b0d752523f54f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sun, 29 Aug 2021 20:00:02 +0200 Subject: [PATCH] fix: make appservices more efficient --- src/client_server/sync.rs | 13 +---- src/database.rs | 3 +- src/database/abstraction/sqlite.rs | 21 ++++++-- src/database/rooms.rs | 87 ++++++++++++++++++++++++------ src/server_server.rs | 51 ++++++++---------- 5 files changed, 116 insertions(+), 59 deletions(-) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index d6e32ea..f7f2454 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -246,12 +246,6 @@ async fn sync_helper( .current_shortstatehash(&room_id)? .expect("All rooms have state"); - let pdus_after_since = db - .rooms - .pdus_after(&sender_user, &room_id, since)? - .next() - .is_some(); - let since_shortstatehash = db.rooms.get_token_shortstatehash(&room_id, since)?; // Calculates joined_member_count, invited_member_count and heroes @@ -341,7 +335,7 @@ async fn sync_helper( true, state_events, ) - } else if !pdus_after_since && since_shortstatehash == Some(current_shortstatehash) { + } else if timeline_pdus.len() == 0 && since_shortstatehash == Some(current_shortstatehash) { // No state changes (Vec::new(), None, None, false, Vec::new()) } else { @@ -401,10 +395,7 @@ async fn sync_helper( let send_member_count = state_events .iter() - .any(|event| event.kind == EventType::RoomMember) - || timeline_pdus.iter().any(|(_, event)| { - event.state_key.is_some() && event.kind == EventType::RoomMember - }); + .any(|event| event.kind == EventType::RoomMember); if encrypted_room { for state_event in &state_events { diff --git a/src/database.rs b/src/database.rs index ca3d2f0..ca0ed88 100644 --- a/src/database.rs +++ b/src/database.rs @@ -287,8 +287,9 @@ impl Database { eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), - stateinfo_cache: Mutex::new(LruCache::new(1000)), our_real_users_cache: RwLock::new(HashMap::new()), + appservice_in_room_cache: RwLock::new(HashMap::new()), + stateinfo_cache: Mutex::new(LruCache::new(1000)), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 3a4623f..f17eac9 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -148,6 +148,7 @@ type TupleOfBytes = (Vec, Vec); impl SqliteTable { #[tracing::instrument(skip(self, guard, key))] fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result>> { + //dbg!(&self.name); Ok(guard .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? .query_row([key], |row| row.get(0)) @@ -156,6 +157,7 @@ impl SqliteTable { #[tracing::instrument(skip(self, guard, key, value))] fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { + //dbg!(&self.name); guard.execute( format!( "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", @@ -182,11 +184,16 @@ impl SqliteTable { let statement_ref = NonAliasingBox(statement); + //let name = self.name.clone(); + let iterator = Box::new( statement .query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(|r| r.unwrap()), + .map(move |r| { + //dbg!(&name); + r.unwrap() + }), ); Box::new(PreparedStatementIterator { @@ -294,6 +301,8 @@ impl Tree for SqliteTable { let guard = self.engine.read_lock_iterator(); let from = from.to_vec(); // TODO change interface? + //let name = self.name.clone(); + if backwards { let statement = Box::leak(Box::new( guard @@ -310,7 +319,10 @@ impl Tree for SqliteTable { statement .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(|r| r.unwrap()), + .map(move |r| { + //dbg!(&name); + r.unwrap() + }), ); Box::new(PreparedStatementIterator { iterator, @@ -332,7 +344,10 @@ impl Tree for SqliteTable { statement .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(|r| r.unwrap()), + .map(move |r| { + //dbg!(&name); + r.unwrap() + }), ); Box::new(PreparedStatementIterator { diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 8eb9b30..59ed950 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -103,6 +103,7 @@ pub struct Rooms { pub(super) statekeyshort_cache: Mutex>, pub(super) shortstatekey_cache: Mutex>, pub(super) our_real_users_cache: RwLock>>>, + pub(super) appservice_in_room_cache: RwLock>>, pub(super) stateinfo_cache: Mutex< LruCache< u64, @@ -2110,6 +2111,11 @@ impl Rooms { } for appservice in db.appservice.all()? { + if self.appservice_in_room(room_id, &appservice, db)? { + db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; + continue; + } + if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces .get("users") @@ -2133,17 +2139,6 @@ impl Rooms { .get("rooms") .and_then(|rooms| rooms.as_sequence()); - let bridge_user_id = appservice - .1 - .get("sender_localpart") - .and_then(|string| string.as_str()) - .and_then(|string| { - UserId::parse_with_server_name(string, db.globals.server_name()).ok() - }); - - let user_is_joined = - |bridge_user_id| self.is_joined(&bridge_user_id, room_id).unwrap_or(false); - let matching_users = |users: &Regex| { users.is_match(pdu.sender.as_str()) || pdu.kind == EventType::RoomMember @@ -2151,9 +2146,6 @@ impl Rooms { .state_key .as_ref() .map_or(false, |state_key| users.is_match(&state_key)) - || self.room_members(&room_id).any(|userid| { - userid.map_or(false, |userid| users.is_match(userid.as_str())) - }) }; let matching_aliases = |aliases: &Regex| { self.room_aliases(&room_id) @@ -2161,8 +2153,7 @@ impl Rooms { .any(|room_alias| aliases.is_match(room_alias.as_str())) }; - if bridge_user_id.map_or(false, user_is_joined) - || aliases.iter().any(matching_aliases) + if aliases.iter().any(matching_aliases) || rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into())) || users.iter().any(matching_users) { @@ -2579,6 +2570,11 @@ impl Rooms { self.serverroomids.insert(&serverroom_id, &[])?; } + self.appservice_in_room_cache + .write() + .unwrap() + .remove(room_id); + Ok(()) } @@ -2608,6 +2604,65 @@ impl Rooms { } } + #[tracing::instrument(skip(self, room_id, appservice, db))] + pub fn appservice_in_room( + &self, + room_id: &RoomId, + appservice: &(String, serde_yaml::Value), + db: &Database, + ) -> Result { + let maybe = self + .appservice_in_room_cache + .read() + .unwrap() + .get(room_id) + .and_then(|map| map.get(&appservice.0)) + .copied(); + + if let Some(b) = maybe { + Ok(b) + } else { + if let Some(namespaces) = appservice.1.get("namespaces") { + let users = namespaces + .get("users") + .and_then(|users| users.as_sequence()) + .map_or_else(Vec::new, |users| { + users + .iter() + .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) + .collect::>() + }); + + let bridge_user_id = appservice + .1 + .get("sender_localpart") + .and_then(|string| string.as_str()) + .and_then(|string| { + UserId::parse_with_server_name(string, db.globals.server_name()).ok() + }); + + let in_room = bridge_user_id + .map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false)) + || self.room_members(&room_id).any(|userid| { + userid.map_or(false, |userid| { + users.iter().any(|r| r.is_match(userid.as_str())) + }) + }); + + self.appservice_in_room_cache + .write() + .unwrap() + .entry(room_id.clone()) + .or_default() + .insert(appservice.0.clone(), in_room); + + Ok(in_room) + } else { + Ok(false) + } + } + } + #[tracing::instrument(skip(self, db))] pub async fn leave_room( &self, diff --git a/src/server_server.rs b/src/server_server.rs index bb83ac0..f5210db 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -48,7 +48,7 @@ use ruma::{ state_res::{self, RoomVersion, StateMap}, to_device::DeviceIdOrAllDevices, uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, - ServerSigningKeyId, UserId, + ServerSigningKeyId, }; use std::{ collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, @@ -2017,6 +2017,11 @@ fn append_incoming_pdu( )?; for appservice in db.appservice.all()? { + if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { + db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; + continue; + } + if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces .get("users") @@ -2029,45 +2034,35 @@ fn append_incoming_pdu( }); let aliases = namespaces .get("aliases") - .and_then(|users| users.get("regex")) - .and_then(|regex| regex.as_str()) - .and_then(|regex| Regex::new(regex).ok()); + .and_then(|aliases| aliases.as_sequence()) + .map_or_else(Vec::new, |aliases| { + aliases + .iter() + .filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok()) + .collect::>() + }); let rooms = namespaces .get("rooms") .and_then(|rooms| rooms.as_sequence()); - let room_aliases = db.rooms.room_aliases(&pdu.room_id); - - let bridge_user_id = appservice - .1 - .get("sender_localpart") - .and_then(|string| string.as_str()) - .and_then(|string| { - UserId::parse_with_server_name(string, db.globals.server_name()).ok() - }); - - #[allow(clippy::blocks_in_if_conditions)] - if bridge_user_id.map_or(false, |bridge_user_id| { - db.rooms - .is_joined(&bridge_user_id, &pdu.room_id) - .unwrap_or(false) - }) || users.iter().any(|users| { + let matching_users = |users: &Regex| { users.is_match(pdu.sender.as_str()) || pdu.kind == EventType::RoomMember && pdu .state_key .as_ref() .map_or(false, |state_key| users.is_match(&state_key)) - }) || aliases.map_or(false, |aliases| { - room_aliases + }; + let matching_aliases = |aliases: &Regex| { + db.rooms + .room_aliases(&pdu.room_id) .filter_map(|r| r.ok()) .any(|room_alias| aliases.is_match(room_alias.as_str())) - }) || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) - || db - .rooms - .room_members(&pdu.room_id) - .filter_map(|r| r.ok()) - .any(|member| users.iter().any(|regex| regex.is_match(member.as_str()))) + }; + + if aliases.iter().any(matching_aliases) + || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) + || users.iter().any(matching_users) { db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; }