From 6924dfc8ea56d8e8347b78364480ea2fce5a5905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 6 Feb 2021 15:27:43 +0100 Subject: [PATCH] improvement: better appservice compatibility and optimizations --- src/appservice_server.rs | 2 +- src/client_server/state.rs | 2 +- src/client_server/sync.rs | 404 ++++++++++++++++++++----------------- src/database.rs | 7 +- src/database/rooms.rs | 4 + src/database/sending.rs | 3 +- src/main.rs | 32 ++- src/ruma_wrapper.rs | 19 +- src/server_server.rs | 4 +- 9 files changed, 270 insertions(+), 207 deletions(-) diff --git a/src/appservice_server.rs b/src/appservice_server.rs index 986909b..ec504b5 100644 --- a/src/appservice_server.rs +++ b/src/appservice_server.rs @@ -1,6 +1,6 @@ use crate::{utils, Error, Result}; use http::header::{HeaderValue, CONTENT_TYPE}; -use log::warn; +use log::{info, warn}; use ruma::api::OutgoingRequest; use std::{ convert::{TryFrom, TryInto}, diff --git a/src/client_server/state.rs b/src/client_server/state.rs index cecb79d..faa415d 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -234,7 +234,7 @@ pub async fn get_state_events_for_empty_key_route( .1; Ok(get_state_events_for_empty_key::Response { - content: serde_json::value::to_raw_value(&event) + content: serde_json::value::to_raw_value(&event.content) .map_err(|_| Error::bad_database("Invalid event content in database"))?, } .into()) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 3136116..494c773 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -95,15 +95,7 @@ pub async fn sync_events_route( // Database queries: - let current_state = db.rooms.room_state_full(&room_id)?; - let current_members = current_state - .iter() - .filter(|(key, _)| key.0 == EventType::RoomMember) - .map(|(key, value)| (&key.1, value)) // Only keep state key - .collect::>(); - let encrypted_room = current_state - .get(&(EventType::RoomEncryption, "".to_owned())) - .is_some(); + let current_state_hash = db.rooms.current_state_hash(&room_id)?; // These type is Option>. The outer Option is None when there is no event between // since and the current room state, meaning there should be no updates. @@ -115,69 +107,85 @@ pub async fn sync_events_route( .as_ref() .map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?); - let since_state = since_state_hash.as_ref().map(|state_hash| { - state_hash - .as_ref() - .and_then(|state_hash| db.rooms.state_full(&room_id, &state_hash).ok()) - }); + let ( + heroes, + joined_member_count, + invited_member_count, + joined_since_last_sync, + state_events, + ) = if since_state_hash != None && Some(¤t_state_hash) != since_state_hash.as_ref() { + let current_state = db.rooms.room_state_full(&room_id)?; + let current_members = current_state + .iter() + .filter(|(key, _)| key.0 == EventType::RoomMember) + .map(|(key, value)| (&key.1, value)) // Only keep state key + .collect::>(); + let encrypted_room = current_state + .get(&(EventType::RoomEncryption, "".to_owned())) + .is_some(); + let since_state = since_state_hash.as_ref().map(|state_hash| { + state_hash + .as_ref() + .and_then(|state_hash| db.rooms.state_full(&room_id, &state_hash).ok()) + }); - let since_encryption = since_state.as_ref().map(|state| { - state - .as_ref() - .map(|state| state.get(&(EventType::RoomEncryption, "".to_owned()))) - }); - - // Calculations: - let new_encrypted_room = - encrypted_room && since_encryption.map_or(false, |encryption| encryption.is_none()); - - let send_member_count = since_state.as_ref().map_or(false, |since_state| { - since_state.as_ref().map_or(true, |since_state| { - current_members.len() - != since_state - .iter() - .filter(|(key, _)| key.0 == EventType::RoomMember) - .count() - }) - }); - - let since_sender_member = since_state.as_ref().map(|since_state| { - since_state.as_ref().and_then(|state| { + let since_encryption = since_state.as_ref().map(|state| { state - .get(&(EventType::RoomMember, sender_user.as_str().to_owned())) - .and_then(|pdu| { - serde_json::from_value::< + .as_ref() + .map(|state| state.get(&(EventType::RoomEncryption, "".to_owned()))) + }); + + // Calculations: + let new_encrypted_room = + encrypted_room && since_encryption.map_or(false, |encryption| encryption.is_none()); + + let send_member_count = since_state.as_ref().map_or(false, |since_state| { + since_state.as_ref().map_or(true, |since_state| { + current_members.len() + != since_state + .iter() + .filter(|(key, _)| key.0 == EventType::RoomMember) + .count() + }) + }); + + let since_sender_member = since_state.as_ref().map(|since_state| { + since_state.as_ref().and_then(|state| { + state + .get(&(EventType::RoomMember, sender_user.as_str().to_owned())) + .and_then(|pdu| { + serde_json::from_value::< Raw, >(pdu.content.clone()) .expect("Raw::from_value always works") .deserialize() .map_err(|_| Error::bad_database("Invalid PDU in database.")) .ok() - }) - }) - }); + }) + }) + }); - if encrypted_room { - for (user_id, current_member) in current_members { - let current_membership = serde_json::from_value::< - Raw, - >(current_member.content.clone()) - .expect("Raw::from_value always works") - .deserialize() - .map_err(|_| Error::bad_database("Invalid PDU in database."))? - .membership; + if encrypted_room { + for (user_id, current_member) in current_members { + let current_membership = serde_json::from_value::< + Raw, + >(current_member.content.clone()) + .expect("Raw::from_value always works") + .deserialize() + .map_err(|_| Error::bad_database("Invalid PDU in database."))? + .membership; - let since_membership = - since_state - .as_ref() - .map_or(MembershipState::Join, |since_state| { - since_state - .as_ref() - .and_then(|since_state| { - since_state - .get(&(EventType::RoomMember, user_id.clone())) - .and_then(|since_member| { - serde_json::from_value::< + let since_membership = + since_state + .as_ref() + .map_or(MembershipState::Join, |since_state| { + since_state + .as_ref() + .and_then(|since_state| { + since_state + .get(&(EventType::RoomMember, user_id.clone())) + .and_then(|since_member| { + serde_json::from_value::< Raw, >( since_member.content.clone() @@ -188,50 +196,158 @@ pub async fn sync_events_route( Error::bad_database("Invalid PDU in database.") }) .ok() - }) - }) - .map_or(MembershipState::Leave, |member| member.membership) - }); + }) + }) + .map_or(MembershipState::Leave, |member| member.membership) + }); - let user_id = UserId::try_from(user_id.clone()) - .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; + let user_id = UserId::try_from(user_id.clone()) + .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; - match (since_membership, current_membership) { - (MembershipState::Leave, MembershipState::Join) => { - // A new user joined an encrypted room - if !share_encrypted_room(&db, &sender_user, &user_id, &room_id) { - device_list_updates.insert(user_id); + match (since_membership, current_membership) { + (MembershipState::Leave, MembershipState::Join) => { + // A new user joined an encrypted room + if !share_encrypted_room(&db, &sender_user, &user_id, &room_id) { + device_list_updates.insert(user_id); + } } + (MembershipState::Join, MembershipState::Leave) => { + // Write down users that have left encrypted rooms we are in + left_encrypted_users.insert(user_id); + } + _ => {} } - (MembershipState::Join, MembershipState::Leave) => { - // Write down users that have left encrypted rooms we are in - left_encrypted_users.insert(user_id); - } - _ => {} } } - } - let joined_since_last_sync = since_sender_member.map_or(false, |member| { - member.map_or(true, |member| member.membership != MembershipState::Join) - }); + let joined_since_last_sync = since_sender_member.map_or(false, |member| { + member.map_or(true, |member| member.membership != MembershipState::Join) + }); - if joined_since_last_sync && encrypted_room || new_encrypted_room { - // If the user is in a new encrypted room, give them all joined users - device_list_updates.extend( + if joined_since_last_sync && encrypted_room || new_encrypted_room { + // If the user is in a new encrypted room, give them all joined users + device_list_updates.extend( + db.rooms + .room_members(&room_id) + .filter_map(|user_id| Some(user_id.ok()?)) + .filter(|user_id| { + // Don't send key updates from the sender to the sender + sender_user != user_id + }) + .filter(|user_id| { + // Only send keys if the sender doesn't share an encrypted room with the target already + !share_encrypted_room(&db, sender_user, user_id, &room_id) + }), + ); + } + + let (joined_member_count, invited_member_count, heroes) = if send_member_count { + let joined_member_count = db.rooms.room_members(&room_id).count(); + let invited_member_count = db.rooms.room_members_invited(&room_id).count(); + + // Recalculate heroes (first 5 members) + let mut heroes = Vec::new(); + + if joined_member_count + invited_member_count <= 5 { + // Go through all PDUs and for each member event, check if the user is still joined or + // invited until we have 5 or we reach the end + + for hero in db + .rooms + .all_pdus(&sender_user, &room_id)? + .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus + .filter(|(_, pdu)| pdu.kind == EventType::RoomMember) + .map(|(_, pdu)| { + let content = serde_json::from_value::< + Raw, + >(pdu.content.clone()) + .expect("Raw::from_value always works") + .deserialize() + .map_err(|_| { + Error::bad_database("Invalid member event in database.") + })?; + + if let Some(state_key) = &pdu.state_key { + let user_id = + UserId::try_from(state_key.clone()).map_err(|_| { + Error::bad_database("Invalid UserId in member PDU.") + })?; + + // The membership was and still is invite or join + if matches!( + content.membership, + MembershipState::Join | MembershipState::Invite + ) && (db.rooms.is_joined(&user_id, &room_id)? + || db.rooms.is_invited(&user_id, &room_id)?) + { + Ok::<_, Error>(Some(state_key.clone())) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) + .filter_map(|u| u.ok()) // Filter out buggy users + // Filter for possible heroes + .filter_map(|u| u) + { + if heroes.contains(&hero) || hero == sender_user.as_str() { + continue; + } + + heroes.push(hero); + } + } + + ( + Some(joined_member_count), + Some(invited_member_count), + heroes, + ) + } else { + (None, None, Vec::new()) + }; + + let state_events = if joined_since_last_sync { db.rooms - .room_members(&room_id) - .filter_map(|user_id| Some(user_id.ok()?)) - .filter(|user_id| { - // Don't send key updates from the sender to the sender - sender_user != user_id - }) - .filter(|user_id| { - // Only send keys if the sender doesn't share an encrypted room with the target already - !share_encrypted_room(&db, sender_user, user_id, &room_id) - }), - ); - } + .room_state_full(&room_id)? + .into_iter() + .map(|(_, pdu)| pdu.to_sync_state_event()) + .collect() + } else { + match since_state { + None => Vec::new(), + Some(Some(since_state)) => current_state + .iter() + .filter(|(key, value)| { + since_state.get(key).map(|e| &e.event_id) != Some(&value.event_id) + }) + .filter(|(_, value)| { + !timeline_pdus.iter().any(|(_, timeline_pdu)| { + timeline_pdu.kind == value.kind + && timeline_pdu.state_key == value.state_key + }) + }) + .map(|(_, pdu)| pdu.to_sync_state_event()) + .collect(), + Some(None) => current_state + .iter() + .map(|(_, pdu)| pdu.to_sync_state_event()) + .collect(), + } + }; + + ( + heroes, + joined_member_count, + invited_member_count, + joined_since_last_sync, + state_events, + ) + } else { + (Vec::new(), None, None, false, Vec::new()) + }; // Look for device list updates in this room device_list_updates.extend( @@ -240,71 +356,6 @@ pub async fn sync_events_route( .filter_map(|r| r.ok()), ); - let (joined_member_count, invited_member_count, heroes) = if send_member_count { - let joined_member_count = db.rooms.room_members(&room_id).count(); - let invited_member_count = db.rooms.room_members_invited(&room_id).count(); - - // Recalculate heroes (first 5 members) - let mut heroes = Vec::new(); - - if joined_member_count + invited_member_count <= 5 { - // Go through all PDUs and for each member event, check if the user is still joined or - // invited until we have 5 or we reach the end - - for hero in db - .rooms - .all_pdus(&sender_user, &room_id)? - .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus - .filter(|(_, pdu)| pdu.kind == EventType::RoomMember) - .map(|(_, pdu)| { - let content = serde_json::from_value::< - Raw, - >(pdu.content.clone()) - .expect("Raw::from_value always works") - .deserialize() - .map_err(|_| Error::bad_database("Invalid member event in database."))?; - - if let Some(state_key) = &pdu.state_key { - let user_id = UserId::try_from(state_key.clone()).map_err(|_| { - Error::bad_database("Invalid UserId in member PDU.") - })?; - - // The membership was and still is invite or join - if matches!( - content.membership, - MembershipState::Join | MembershipState::Invite - ) && (db.rooms.is_joined(&user_id, &room_id)? - || db.rooms.is_invited(&user_id, &room_id)?) - { - Ok::<_, Error>(Some(state_key.clone())) - } else { - Ok(None) - } - } else { - Ok(None) - } - }) - .filter_map(|u| u.ok()) // Filter out buggy users - // Filter for possible heroes - .filter_map(|u| u) - { - if heroes.contains(&hero) || hero == sender_user.as_str() { - continue; - } - - heroes.push(hero); - } - } - - ( - Some(joined_member_count), - Some(invited_member_count), - heroes, - ) - } else { - (None, None, Vec::new()) - }; - let notification_count = if send_notification_counts { if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_user)? { Some( @@ -385,34 +436,7 @@ pub async fn sync_events_route( events: room_events, }, state: sync_events::State { - events: if joined_since_last_sync { - db.rooms - .room_state_full(&room_id)? - .into_iter() - .map(|(_, pdu)| pdu.to_sync_state_event()) - .collect() - } else { - match since_state { - None => Vec::new(), - Some(Some(since_state)) => current_state - .iter() - .filter(|(key, value)| { - since_state.get(key).map(|e| &e.event_id) != Some(&value.event_id) - }) - .filter(|(_, value)| { - !timeline_pdus.iter().any(|(_, timeline_pdu)| { - timeline_pdu.kind == value.kind - && timeline_pdu.state_key == value.state_key - }) - }) - .map(|(_, pdu)| pdu.to_sync_state_event()) - .collect(), - Some(None) => current_state - .iter() - .map(|(_, pdu)| pdu.to_sync_state_event()) - .collect(), - } - }, + events: state_events, }, ephemeral: sync_events::Ephemeral { events: edus }, }; diff --git a/src/database.rs b/src/database.rs index 607e1be..afcd58f 100644 --- a/src/database.rs +++ b/src/database.rs @@ -27,7 +27,7 @@ pub struct Config { server_name: Box, database_path: String, #[serde(default = "default_cache_capacity")] - cache_capacity: u64, + cache_capacity: u32, #[serde(default = "default_max_request_size")] max_request_size: u32, #[serde(default = "default_max_concurrent_requests")] @@ -48,7 +48,7 @@ fn true_fn() -> bool { true } -fn default_cache_capacity() -> u64 { +fn default_cache_capacity() -> u32 { 1024 * 1024 * 1024 } @@ -93,8 +93,7 @@ impl Database { pub async fn load_or_create(config: Config) -> Result { let db = sled::Config::default() .path(&config.database_path) - .cache_capacity(config.cache_capacity) - .print_profile_on_drop(false) + .cache_capacity(config.cache_capacity as u64) .open()?; info!("Opened sled database at {}", config.database_path); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 4081944..b35d006 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1010,6 +1010,10 @@ impl Rooms { .filter_map(|r| r.ok()) .any(|room_alias| aliases.is_match(room_alias.as_str())) }) || rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into())) + || self + .room_members(&room_id) + .filter_map(|r| r.ok()) + .any(|member| users.iter().any(|regex| regex.is_match(member.as_str()))) { sending.send_pdu_appservice(&appservice.0, &pdu_id)?; } diff --git a/src/database/sending.rs b/src/database/sending.rs index 74aad32..0a66f73 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -8,7 +8,7 @@ use std::{ use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; use federation::transactions::send_transaction_message; -use log::info; +use log::{error, info}; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ api::{appservice, federation, OutgoingRequest}, @@ -131,6 +131,7 @@ impl Sending { }; prefix.extend_from_slice(server.as_bytes()); prefix.push(0xff); + last_failed_try.insert(server.clone(), match last_failed_try.get(&server) { Some(last_failed) => { (last_failed.0+1, Instant::now()) diff --git a/src/main.rs b/src/main.rs index 93ab560..65434a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ pub use database::Database; pub use error::{ConduitLogger, Error, Result}; pub use pdu::PduEvent; pub use rocket::State; +use ruma::api::client::error::ErrorKind; pub use ruma_wrapper::{ConduitResult, Ruma, RumaResponse}; use log::LevelFilter; @@ -154,7 +155,13 @@ fn setup_rocket() -> rocket::Rocket { server_server::get_profile_information_route, ], ) - .register(catchers![not_found_catcher]) + .register(catchers![ + not_found_catcher, + forbidden_catcher, + unknown_token_catcher, + missing_token_catcher, + bad_json_catcher + ]) .attach(AdHoc::on_attach("Config", |rocket| async { let config = rocket .figment() @@ -186,3 +193,26 @@ async fn main() { fn not_found_catcher(_req: &'_ Request<'_>) -> String { "404 Not Found".to_owned() } + +#[catch(580)] +fn forbidden_catcher() -> Result<()> { + Err(Error::BadRequest(ErrorKind::Forbidden, "Forbidden.")) +} + +#[catch(581)] +fn unknown_token_catcher() -> Result<()> { + Err(Error::BadRequest( + ErrorKind::UnknownToken { soft_logout: false }, + "Unknown token.", + )) +} + +#[catch(582)] +fn missing_token_catcher() -> Result<()> { + Err(Error::BadRequest(ErrorKind::MissingToken, "Missing token.")) +} + +#[catch(583)] +fn bad_json_catcher() -> Result<()> { + Err(Error::BadRequest(ErrorKind::BadJson, "Bad json.")) +} diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index 0fdca74..45fcc7f 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -45,7 +45,7 @@ where http::request::Request>, >>::Error: std::fmt::Debug, { - type Error = (); // TODO: Better error handling + type Error = (); type Owned = Data; type Borrowed = Self::Owned; @@ -82,7 +82,9 @@ where registration .get("as_token") .and_then(|as_token| as_token.as_str()) - .map_or(false, |as_token| token.as_deref() == Some(as_token)) + .map_or(false, |as_token| { + dbg!(token.as_deref()) == dbg!(Some(as_token)) + }) }) { match T::METADATA.authentication { AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => { @@ -105,7 +107,8 @@ where ); if !db.users.exists(&user_id).unwrap() { - return Failure((Status::Unauthorized, ())); + // Forbidden + return Failure((Status::raw(580), ())); } // TODO: Check if appservice is allowed to be that user @@ -119,15 +122,15 @@ where AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => { if let Some(token) = token { match db.users.find_from_token(&token).unwrap() { - // TODO: M_UNKNOWN_TOKEN - None => return Failure((Status::Unauthorized, ())), + // Unknown Token + None => return Failure((Status::raw(581), ())), Some((user_id, device_id)) => { (Some(user_id), Some(device_id.into()), false) } } } else { - // TODO: M_MISSING_TOKEN - return Failure((Status::Unauthorized, ())); + // Missing Token + return Failure((Status::raw(582), ())); } } AuthScheme::ServerSignatures => (None, None, false), @@ -163,7 +166,7 @@ where }), Err(e) => { warn!("{:?}", e); - Failure((Status::BadRequest, ())) + Failure((Status::raw(583), ())) } } }) diff --git a/src/server_server.rs b/src/server_server.rs index 7ff9e3f..3fea4da 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -557,7 +557,7 @@ pub async fn send_transaction_message_route<'a>( pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); - db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; + let next_room_state = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; db.rooms.append_pdu( &pdu, @@ -569,6 +569,8 @@ pub async fn send_transaction_message_route<'a>( &db.admin, )?; + db.rooms.set_room_state(&room_id, &next_room_state)?; + for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; }