From 632a1343eb900f05714621ed8b0d752523f54f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sun, 29 Aug 2021 20:00:02 +0200 Subject: [PATCH 1/3] fix: make appservices more efficient --- src/client_server/sync.rs | 13 +---- src/database.rs | 3 +- src/database/abstraction/sqlite.rs | 21 ++++++-- src/database/rooms.rs | 87 ++++++++++++++++++++++++------ src/server_server.rs | 51 ++++++++---------- 5 files changed, 116 insertions(+), 59 deletions(-) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index d6e32ea..f7f2454 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -246,12 +246,6 @@ async fn sync_helper( .current_shortstatehash(&room_id)? .expect("All rooms have state"); - let pdus_after_since = db - .rooms - .pdus_after(&sender_user, &room_id, since)? - .next() - .is_some(); - let since_shortstatehash = db.rooms.get_token_shortstatehash(&room_id, since)?; // Calculates joined_member_count, invited_member_count and heroes @@ -341,7 +335,7 @@ async fn sync_helper( true, state_events, ) - } else if !pdus_after_since && since_shortstatehash == Some(current_shortstatehash) { + } else if timeline_pdus.len() == 0 && since_shortstatehash == Some(current_shortstatehash) { // No state changes (Vec::new(), None, None, false, Vec::new()) } else { @@ -401,10 +395,7 @@ async fn sync_helper( let send_member_count = state_events .iter() - .any(|event| event.kind == EventType::RoomMember) - || timeline_pdus.iter().any(|(_, event)| { - event.state_key.is_some() && event.kind == EventType::RoomMember - }); + .any(|event| event.kind == EventType::RoomMember); if encrypted_room { for state_event in &state_events { diff --git a/src/database.rs b/src/database.rs index ca3d2f0..ca0ed88 100644 --- a/src/database.rs +++ b/src/database.rs @@ -287,8 +287,9 @@ impl Database { eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), - stateinfo_cache: Mutex::new(LruCache::new(1000)), our_real_users_cache: RwLock::new(HashMap::new()), + appservice_in_room_cache: RwLock::new(HashMap::new()), + stateinfo_cache: Mutex::new(LruCache::new(1000)), }, account_data: account_data::AccountData { roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index 3a4623f..f17eac9 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -148,6 +148,7 @@ type TupleOfBytes = (Vec, Vec); impl SqliteTable { #[tracing::instrument(skip(self, guard, key))] fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result>> { + //dbg!(&self.name); Ok(guard .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? .query_row([key], |row| row.get(0)) @@ -156,6 +157,7 @@ impl SqliteTable { #[tracing::instrument(skip(self, guard, key, value))] fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { + //dbg!(&self.name); guard.execute( format!( "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", @@ -182,11 +184,16 @@ impl SqliteTable { let statement_ref = NonAliasingBox(statement); + //let name = self.name.clone(); + let iterator = Box::new( statement .query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(|r| r.unwrap()), + .map(move |r| { + //dbg!(&name); + r.unwrap() + }), ); Box::new(PreparedStatementIterator { @@ -294,6 +301,8 @@ impl Tree for SqliteTable { let guard = self.engine.read_lock_iterator(); let from = from.to_vec(); // TODO change interface? + //let name = self.name.clone(); + if backwards { let statement = Box::leak(Box::new( guard @@ -310,7 +319,10 @@ impl Tree for SqliteTable { statement .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(|r| r.unwrap()), + .map(move |r| { + //dbg!(&name); + r.unwrap() + }), ); Box::new(PreparedStatementIterator { iterator, @@ -332,7 +344,10 @@ impl Tree for SqliteTable { statement .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .unwrap() - .map(|r| r.unwrap()), + .map(move |r| { + //dbg!(&name); + r.unwrap() + }), ); Box::new(PreparedStatementIterator { diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 8eb9b30..59ed950 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -103,6 +103,7 @@ pub struct Rooms { pub(super) statekeyshort_cache: Mutex>, pub(super) shortstatekey_cache: Mutex>, pub(super) our_real_users_cache: RwLock>>>, + pub(super) appservice_in_room_cache: RwLock>>, pub(super) stateinfo_cache: Mutex< LruCache< u64, @@ -2110,6 +2111,11 @@ impl Rooms { } for appservice in db.appservice.all()? { + if self.appservice_in_room(room_id, &appservice, db)? { + db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; + continue; + } + if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces .get("users") @@ -2133,17 +2139,6 @@ impl Rooms { .get("rooms") .and_then(|rooms| rooms.as_sequence()); - let bridge_user_id = appservice - .1 - .get("sender_localpart") - .and_then(|string| string.as_str()) - .and_then(|string| { - UserId::parse_with_server_name(string, db.globals.server_name()).ok() - }); - - let user_is_joined = - |bridge_user_id| self.is_joined(&bridge_user_id, room_id).unwrap_or(false); - let matching_users = |users: &Regex| { users.is_match(pdu.sender.as_str()) || pdu.kind == EventType::RoomMember @@ -2151,9 +2146,6 @@ impl Rooms { .state_key .as_ref() .map_or(false, |state_key| users.is_match(&state_key)) - || self.room_members(&room_id).any(|userid| { - userid.map_or(false, |userid| users.is_match(userid.as_str())) - }) }; let matching_aliases = |aliases: &Regex| { self.room_aliases(&room_id) @@ -2161,8 +2153,7 @@ impl Rooms { .any(|room_alias| aliases.is_match(room_alias.as_str())) }; - if bridge_user_id.map_or(false, user_is_joined) - || aliases.iter().any(matching_aliases) + if aliases.iter().any(matching_aliases) || rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into())) || users.iter().any(matching_users) { @@ -2579,6 +2570,11 @@ impl Rooms { self.serverroomids.insert(&serverroom_id, &[])?; } + self.appservice_in_room_cache + .write() + .unwrap() + .remove(room_id); + Ok(()) } @@ -2608,6 +2604,65 @@ impl Rooms { } } + #[tracing::instrument(skip(self, room_id, appservice, db))] + pub fn appservice_in_room( + &self, + room_id: &RoomId, + appservice: &(String, serde_yaml::Value), + db: &Database, + ) -> Result { + let maybe = self + .appservice_in_room_cache + .read() + .unwrap() + .get(room_id) + .and_then(|map| map.get(&appservice.0)) + .copied(); + + if let Some(b) = maybe { + Ok(b) + } else { + if let Some(namespaces) = appservice.1.get("namespaces") { + let users = namespaces + .get("users") + .and_then(|users| users.as_sequence()) + .map_or_else(Vec::new, |users| { + users + .iter() + .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) + .collect::>() + }); + + let bridge_user_id = appservice + .1 + .get("sender_localpart") + .and_then(|string| string.as_str()) + .and_then(|string| { + UserId::parse_with_server_name(string, db.globals.server_name()).ok() + }); + + let in_room = bridge_user_id + .map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false)) + || self.room_members(&room_id).any(|userid| { + userid.map_or(false, |userid| { + users.iter().any(|r| r.is_match(userid.as_str())) + }) + }); + + self.appservice_in_room_cache + .write() + .unwrap() + .entry(room_id.clone()) + .or_default() + .insert(appservice.0.clone(), in_room); + + Ok(in_room) + } else { + Ok(false) + } + } + } + #[tracing::instrument(skip(self, db))] pub async fn leave_room( &self, diff --git a/src/server_server.rs b/src/server_server.rs index bb83ac0..f5210db 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -48,7 +48,7 @@ use ruma::{ state_res::{self, RoomVersion, StateMap}, to_device::DeviceIdOrAllDevices, uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, - ServerSigningKeyId, UserId, + ServerSigningKeyId, }; use std::{ collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, @@ -2017,6 +2017,11 @@ fn append_incoming_pdu( )?; for appservice in db.appservice.all()? { + if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { + db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; + continue; + } + if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces .get("users") @@ -2029,45 +2034,35 @@ fn append_incoming_pdu( }); let aliases = namespaces .get("aliases") - .and_then(|users| users.get("regex")) - .and_then(|regex| regex.as_str()) - .and_then(|regex| Regex::new(regex).ok()); + .and_then(|aliases| aliases.as_sequence()) + .map_or_else(Vec::new, |aliases| { + aliases + .iter() + .filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok()) + .collect::>() + }); let rooms = namespaces .get("rooms") .and_then(|rooms| rooms.as_sequence()); - let room_aliases = db.rooms.room_aliases(&pdu.room_id); - - let bridge_user_id = appservice - .1 - .get("sender_localpart") - .and_then(|string| string.as_str()) - .and_then(|string| { - UserId::parse_with_server_name(string, db.globals.server_name()).ok() - }); - - #[allow(clippy::blocks_in_if_conditions)] - if bridge_user_id.map_or(false, |bridge_user_id| { - db.rooms - .is_joined(&bridge_user_id, &pdu.room_id) - .unwrap_or(false) - }) || users.iter().any(|users| { + let matching_users = |users: &Regex| { users.is_match(pdu.sender.as_str()) || pdu.kind == EventType::RoomMember && pdu .state_key .as_ref() .map_or(false, |state_key| users.is_match(&state_key)) - }) || aliases.map_or(false, |aliases| { - room_aliases + }; + let matching_aliases = |aliases: &Regex| { + db.rooms + .room_aliases(&pdu.room_id) .filter_map(|r| r.ok()) .any(|room_alias| aliases.is_match(room_alias.as_str())) - }) || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) - || db - .rooms - .room_members(&pdu.room_id) - .filter_map(|r| r.ok()) - .any(|member| users.iter().any(|regex| regex.is_match(member.as_str()))) + }; + + if aliases.iter().any(matching_aliases) + || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) + || users.iter().any(matching_users) { db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; } From d57c19802d8c3502c1aa739f0615b0f3aa448266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 30 Aug 2021 10:46:36 +0200 Subject: [PATCH 2/3] improvement: don't do state updates if the event was soft failed --- src/server_server.rs | 71 +++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index f5210db..42d8185 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1497,6 +1497,47 @@ async fn upgrade_outlier_to_timeline_pdu( ) .map_err(|_| "Failed to get_auth_events.".to_owned())?; + let state_ids_compressed = state_at_incoming_event + .iter() + .map(|(shortstatekey, id)| { + db.rooms + .compress_state_event(*shortstatekey, &id, &db.globals) + .map_err(|_| "Failed to compress_state_event".to_owned()) + }) + .collect::>()?; + + // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it + debug!("starting soft fail auth check"); + + let soft_fail = !state_res::event_auth::auth_check( + &room_version, + &incoming_pdu, + previous_create, + None, + |k, s| auth_events.get(&(k.clone(), s.to_owned())).map(Arc::clone), + ) + .map_err(|_e| "Auth check failed.".to_owned())?; + + if soft_fail { + append_incoming_pdu( + &db, + &incoming_pdu, + val, + extremities, + state_ids_compressed, + soft_fail, + &state_lock + ) + .map_err(|_| "Failed to add pdu to db.".to_owned())?; + + // Soft fail, we keep the event as an outlier but don't add it to the timeline + warn!("Event was soft failed: {:?}", incoming_pdu); + db.rooms + .mark_event_soft_failed(&incoming_pdu.event_id) + .map_err(|_| "Failed to set soft failed flag".to_owned())?; + return Err("Event has been soft failed".into()); + } + if incoming_pdu.state_key.is_some() { let mut extremity_sstatehashes = HashMap::new(); @@ -1651,31 +1692,10 @@ async fn upgrade_outlier_to_timeline_pdu( extremities.insert(incoming_pdu.event_id.clone()); - // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it - debug!("starting soft fail auth check"); - - let soft_fail = !state_res::event_auth::auth_check( - &room_version, - &incoming_pdu, - previous_create, - None, - |k, s| auth_events.get(&(k.clone(), s.to_owned())).map(Arc::clone), - ) - .map_err(|_e| "Auth check failed.".to_owned())?; - // Now that the event has passed all auth it is added into the timeline. // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. - let state_ids_compressed = state_at_incoming_event - .iter() - .map(|(shortstatekey, id)| { - db.rooms - .compress_state_event(*shortstatekey, &id, &db.globals) - .map_err(|_| "Failed to compress_state_event".to_owned()) - }) - .collect::>()?; - let pdu_id = append_incoming_pdu( &db, &incoming_pdu, @@ -1689,15 +1709,6 @@ async fn upgrade_outlier_to_timeline_pdu( debug!("Appended incoming pdu."); - if soft_fail { - // Soft fail, we keep the event as an outlier but don't add it to the timeline - warn!("Event was soft failed: {:?}", incoming_pdu); - db.rooms - .mark_event_soft_failed(&incoming_pdu.event_id) - .map_err(|_| "Failed to set soft failed flag".to_owned())?; - return Err("Event has been soft failed".into()); - } - // Event has passed all auth/stateres checks drop(state_lock); Ok(pdu_id) From 1b25e78e3ae6d60e73cea1c50af802cdf80045c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 30 Aug 2021 10:56:41 +0200 Subject: [PATCH 3/3] fix: inviting dendrite users --- src/client_server/membership.rs | 16 ++++++++++++++-- src/server_server.rs | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 29926e3..0a7ca81 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -973,6 +973,14 @@ pub async fn invite_helper<'a>( (room_version_id, pdu_json, invite_room_state) }; + // Generate event id + let expected_event_id = EventId::try_from(&*format!( + "${}", + ruma::signatures::reference_hash(&pdu_json, &room_version_id) + .expect("ruma can calculate reference hashes") + )) + .expect("ruma's reference hashes are valid event ids"); + let response = db .sending .send_federation_request( @@ -980,9 +988,9 @@ pub async fn invite_helper<'a>( user_id.server_name(), create_invite::v2::Request { room_id: room_id.clone(), - event_id: ruma::event_id!("$receivingservershouldsetthis"), + event_id: expected_event_id.clone(), room_version: room_version_id, - event: PduEvent::convert_to_outgoing_federation_event(pdu_json), + event: PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()), invite_room_state, }, ) @@ -1002,6 +1010,10 @@ pub async fn invite_helper<'a>( } }; + if expected_event_id != event_id { + warn!("Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}", user_id.server_name(), pdu_json, value); + } + let origin = serde_json::from_value::>( serde_json::to_value(value.get("origin").ok_or(Error::BadRequest( ErrorKind::InvalidParam, diff --git a/src/server_server.rs b/src/server_server.rs index 42d8185..3682a49 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1526,7 +1526,7 @@ async fn upgrade_outlier_to_timeline_pdu( extremities, state_ids_compressed, soft_fail, - &state_lock + &state_lock, ) .map_err(|_| "Failed to add pdu to db.".to_owned())?;