diff --git a/README.md b/README.md index e856dd5..fde762c 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,8 @@ are not aware of such a bug today, but we would like to do more testing. There are still a few important features missing: - Database stability (currently you might have to do manual upgrades or even wipe the db for new versions) -- End-to-end encrypted chats over federation -- Typing, presence, read receipts etc. over federation +- Edge cases for end-to-end encryption over federation +- Typing and presence over federation - Lots of testing Check out the [Conduit 1.0 Release Milestone](https://gitlab.com/famedly/conduit/-/milestones/3). diff --git a/src/client_server/keys.rs b/src/client_server/keys.rs index 8f1afba..418e41a 100644 --- a/src/client_server/keys.rs +++ b/src/client_server/keys.rs @@ -1,19 +1,23 @@ use super::SESSION_ID_LENGTH; use crate::{database::DatabaseGuard, utils, ConduitResult, Database, Error, Result, Ruma}; use ruma::{ - api::client::{ - error::ErrorKind, - r0::{ - keys::{ - claim_keys, get_key_changes, get_keys, upload_keys, upload_signatures, - upload_signing_keys, + api::{ + client::{ + error::ErrorKind, + r0::{ + keys::{ + claim_keys, get_key_changes, get_keys, upload_keys, upload_signatures, + upload_signing_keys, + }, + uiaa::{AuthFlow, UiaaInfo}, }, - uiaa::{AuthFlow, UiaaInfo}, }, + federation, }, encryption::UnsignedDeviceInfo, DeviceId, DeviceKeyAlgorithm, UserId, }; +use serde_json::json; use std::collections::{BTreeMap, HashSet}; #[cfg(feature = "conduit_bin")] @@ -84,7 +88,8 @@ pub async fn get_keys_route( &body.device_keys, |u| u == sender_user, &db, - )?; + ) + .await?; Ok(response.into()) } @@ -98,7 +103,7 @@ pub async fn claim_keys_route( db: DatabaseGuard, body: Ruma, ) -> ConduitResult { - let response = claim_keys_helper(&body.one_time_keys, &db)?; + let response = claim_keys_helper(&body.one_time_keys, &db).await?; db.flush().await?; @@ -278,7 +283,7 @@ pub async fn get_key_changes_route( .into()) } -pub fn get_keys_helper bool>( +pub async fn get_keys_helper bool>( sender_user: Option<&UserId>, device_keys_input: &BTreeMap>>, allowed_signatures: F, @@ -289,7 +294,17 @@ pub fn get_keys_helper bool>( let mut user_signing_keys = BTreeMap::new(); let mut device_keys = BTreeMap::new(); + let mut get_over_federation = BTreeMap::new(); + for (user_id, device_ids) in device_keys_input { + if user_id.server_name() != db.globals.server_name() { + get_over_federation + .entry(user_id.server_name()) + .or_insert_with(Vec::new) + .push((user_id, device_ids)); + continue; + } + if device_ids.is_empty() { let mut container = BTreeMap::new(); for device_id in db.users.all_device_ids(user_id) { @@ -347,21 +362,60 @@ pub fn get_keys_helper bool>( } } + let mut failures = BTreeMap::new(); + + for (server, vec) in get_over_federation { + let mut device_keys_input_fed = BTreeMap::new(); + for (user_id, keys) in vec { + device_keys_input_fed.insert(user_id.clone(), keys.clone()); + } + match db + .sending + .send_federation_request( + &db.globals, + server, + federation::keys::get_keys::v1::Request { + device_keys: device_keys_input_fed, + }, + ) + .await + { + Ok(response) => { + master_keys.extend(response.master_keys); + self_signing_keys.extend(response.self_signing_keys); + device_keys.extend(response.device_keys); + } + Err(_e) => { + failures.insert(server.to_string(), json!({})); + } + } + } + Ok(get_keys::Response { master_keys, self_signing_keys, user_signing_keys, device_keys, - failures: BTreeMap::new(), + failures, }) } -pub fn claim_keys_helper( +pub async fn claim_keys_helper( one_time_keys_input: &BTreeMap, DeviceKeyAlgorithm>>, db: &Database, ) -> Result { let mut one_time_keys = BTreeMap::new(); + + let mut get_over_federation = BTreeMap::new(); + for (user_id, map) in one_time_keys_input { + if user_id.server_name() != db.globals.server_name() { + get_over_federation + .entry(user_id.server_name()) + .or_insert_with(Vec::new) + .push((user_id, map)); + } + let mut container = BTreeMap::new(); for (device_id, key_algorithm) in map { if let Some(one_time_keys) = @@ -376,6 +430,26 @@ pub fn claim_keys_helper( one_time_keys.insert(user_id.clone(), container); } + for (server, vec) in get_over_federation { + let mut one_time_keys_input_fed = BTreeMap::new(); + for (user_id, keys) in vec { + one_time_keys_input_fed.insert(user_id.clone(), keys.clone()); + } + // Ignore failures + let keys = db + .sending + .send_federation_request( + &db.globals, + server, + federation::keys::claim_keys::v1::Request { + one_time_keys: one_time_keys_input_fed, + }, + ) + .await?; + + one_time_keys.extend(keys.one_time_keys); + } + Ok(claim_keys::Response { failures: BTreeMap::new(), one_time_keys, diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index d8c2781..e17a4d7 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -836,7 +836,7 @@ pub async fn invite_helper<'a>( is_direct: Some(is_direct), membership: MembershipState::Invite, third_party_invite: None, - blurhash: db.users.blurhash(&sender_user)?, + blurhash: None, }) .expect("member event is valid value"); @@ -1012,7 +1012,7 @@ pub async fn invite_helper<'a>( avatar_url: db.users.avatar_url(&user_id)?, is_direct: Some(is_direct), third_party_invite: None, - blurhash: db.users.blurhash(&sender_user)?, + blurhash: db.users.blurhash(&user_id)?, }) .expect("event is valid, we just created it"), unsigned: None, diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 1938c87..693254f 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -1,10 +1,13 @@ use crate::{database::DatabaseGuard, pdu::PduBuilder, utils, ConduitResult, Error, Ruma}; use ruma::{ - api::client::{ - error::ErrorKind, - r0::profile::{ - get_avatar_url, get_display_name, get_profile, set_avatar_url, set_display_name, + api::{ + client::{ + error::ErrorKind, + r0::profile::{ + get_avatar_url, get_display_name, get_profile, set_avatar_url, set_display_name, + }, }, + federation::{self, query::get_profile_information::v1::ProfileField}, }, events::EventType, serde::Raw, @@ -120,6 +123,25 @@ pub async fn get_displayname_route( db: DatabaseGuard, body: Ruma>, ) -> ConduitResult { + if body.user_id.server_name() != db.globals.server_name() { + let response = db + .sending + .send_federation_request( + &db.globals, + body.user_id.server_name(), + federation::query::get_profile_information::v1::Request { + user_id: &body.user_id, + field: Some(&ProfileField::DisplayName), + }, + ) + .await?; + + return Ok(get_display_name::Response { + displayname: response.displayname, + } + .into()); + } + Ok(get_display_name::Response { displayname: db.users.displayname(&body.user_id)?, } @@ -234,6 +256,26 @@ pub async fn get_avatar_url_route( db: DatabaseGuard, body: Ruma>, ) -> ConduitResult { + if body.user_id.server_name() != db.globals.server_name() { + let response = db + .sending + .send_federation_request( + &db.globals, + body.user_id.server_name(), + federation::query::get_profile_information::v1::Request { + user_id: &body.user_id, + field: Some(&ProfileField::AvatarUrl), + }, + ) + .await?; + + return Ok(get_avatar_url::Response { + avatar_url: response.avatar_url, + blurhash: response.blurhash, + } + .into()); + } + Ok(get_avatar_url::Response { avatar_url: db.users.avatar_url(&body.user_id)?, blurhash: db.users.blurhash(&body.user_id)?, @@ -250,6 +292,27 @@ pub async fn get_profile_route( db: DatabaseGuard, body: Ruma>, ) -> ConduitResult { + if body.user_id.server_name() != db.globals.server_name() { + let response = db + .sending + .send_federation_request( + &db.globals, + body.user_id.server_name(), + federation::query::get_profile_information::v1::Request { + user_id: &body.user_id, + field: None, + }, + ) + .await?; + + return Ok(get_profile::Response { + displayname: response.displayname, + avatar_url: response.avatar_url, + blurhash: response.blurhash, + } + .into()); + } + if !db.users.exists(&body.user_id)? { // Return 404 if this user doesn't exist return Err(Error::BadRequest( diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 427b9ad..fdb8f25 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -421,7 +421,10 @@ async fn sync_helper( let send_member_count = state_events .iter() - .any(|event| event.kind == EventType::RoomMember); + .any(|event| event.kind == EventType::RoomMember) + || timeline_pdus.iter().any(|(_, event)| { + event.state_key.is_some() && event.kind == EventType::RoomMember + }); if encrypted_room { for (user_id, current_member) in db diff --git a/src/client_server/to_device.rs b/src/client_server/to_device.rs index 7896af9..e3fd780 100644 --- a/src/client_server/to_device.rs +++ b/src/client_server/to_device.rs @@ -1,6 +1,12 @@ +use std::collections::BTreeMap; + use crate::{database::DatabaseGuard, ConduitResult, Error, Ruma}; use ruma::{ - api::client::{error::ErrorKind, r0::to_device::send_event_to_device}, + api::{ + client::{error::ErrorKind, r0::to_device::send_event_to_device}, + federation::{self, transactions::edu::DirectDeviceContent}, + }, + events::EventType, to_device::DeviceIdOrAllDevices, }; @@ -33,6 +39,28 @@ pub async fn send_event_to_device_route( for (target_user_id, map) in &body.messages { for (target_device_id_maybe, event) in map { + if target_user_id.server_name() != db.globals.server_name() { + let mut map = BTreeMap::new(); + map.insert(target_device_id_maybe.clone(), event.clone()); + let mut messages = BTreeMap::new(); + messages.insert(target_user_id.clone(), map); + + db.sending.send_reliable_edu( + target_user_id.server_name(), + &serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice( + DirectDeviceContent { + sender: sender_user.clone(), + ev_type: EventType::from(&body.event_type), + message_id: body.txn_id.clone(), + messages, + }, + )) + .expect("DirectToDevice EDU can be serialized"), + )?; + + continue; + } + match target_device_id_maybe { DeviceIdOrAllDevices::DeviceId(target_device_id) => db.users.add_to_device_event( sender_user, diff --git a/src/database/sending.rs b/src/database/sending.rs index 7c9cf64..a07192e 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -164,8 +164,8 @@ impl Sending { // Find events that have been added since starting the last request let new_events = guard.sending.servernamepduids .scan_prefix(prefix.clone()) - .map(|(k, _)| { - SendingEventType::Pdu(k[prefix.len()..].to_vec()) + .filter_map(|(k, _)| { + Self::parse_servercurrentevent(&k).ok().map(|ev| (ev, k)) }) .take(30) .collect::>(); @@ -174,16 +174,9 @@ impl Sending { if !new_events.is_empty() { // Insert pdus we found - for event in &new_events { - let mut current_key = prefix.clone(); - match event { - SendingEventType::Pdu(b) | - SendingEventType::Edu(b) => { - current_key.extend_from_slice(&b); - guard.sending.servercurrentevents.insert(¤t_key, &[]).unwrap(); - guard.sending.servernamepduids.remove(¤t_key).unwrap(); - } - } + for (_, key) in &new_events { + guard.sending.servercurrentevents.insert(&key, &[]).unwrap(); + guard.sending.servernamepduids.remove(&key).unwrap(); } drop(guard); @@ -191,7 +184,7 @@ impl Sending { futures.push( Self::handle_events( outgoing_kind.clone(), - new_events, + new_events.into_iter().map(|(event, _)| event.1).collect(), Arc::clone(&db), ) ); @@ -290,7 +283,8 @@ impl Sending { if let OutgoingKind::Normal(server_name) = outgoing_kind { if let Ok((select_edus, last_count)) = Self::select_edus(db, server_name) { - events.extend_from_slice(&select_edus); + events.extend(select_edus.into_iter().map(SendingEventType::Edu)); + db.sending .servername_educount .insert(server_name.as_bytes(), &last_count.to_be_bytes())?; @@ -301,7 +295,7 @@ impl Sending { Ok(Some(events)) } - pub fn select_edus(db: &Database, server: &ServerName) -> Result<(Vec, u64)> { + pub fn select_edus(db: &Database, server: &ServerName) -> Result<(Vec>, u64)> { // u64: count of last edu let since = db .sending @@ -366,9 +360,7 @@ impl Sending { } }; - events.push(SendingEventType::Edu( - serde_json::to_vec(&federation_event).expect("json can be serialized"), - )); + events.push(serde_json::to_vec(&federation_event).expect("json can be serialized")); if events.len() >= 20 { break 'outer; @@ -402,6 +394,18 @@ impl Sending { Ok(()) } + #[tracing::instrument(skip(self))] + pub fn send_reliable_edu(&self, server: &ServerName, serialized: &[u8]) -> Result<()> { + let mut key = server.as_bytes().to_vec(); + key.push(0xff); + key.push(b'*'); + key.extend_from_slice(serialized); + self.servernamepduids.insert(&key, b"")?; + self.sender.unbounded_send(key).unwrap(); + + Ok(()) + } + #[tracing::instrument(skip(self))] pub fn send_pdu_appservice(&self, appservice_id: &str, pdu_id: &[u8]) -> Result<()> { let mut key = b"+".to_vec(); diff --git a/src/server_server.rs b/src/server_server.rs index d51b672..f666188 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1027,42 +1027,39 @@ pub fn handle_incoming_pdu<'a>( .map_err(|_| "Failed talking to db".to_owned())? .map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok()) .flatten(); - if let Some(mut state) = state { - if db - .rooms - .get_pdu(prev_event) - .ok() - .flatten() - .ok_or_else(|| "Could not find prev event, but we know the state.".to_owned())? - .state_key - .is_some() - { - state.insert(prev_event.clone()); - } - state_at_incoming_event = Some( - fetch_and_handle_events( - db, - origin, - &state.into_iter().collect::>(), - &room_id, - pub_key_map, - ) - .await - .map_err(|_| "Failed to fetch state events locally".to_owned())? - .into_iter() - .map(|pdu| { + if let Some(state) = state { + let mut state = fetch_and_handle_events( + db, + origin, + &state.into_iter().collect::>(), + &room_id, + pub_key_map, + ) + .await + .map_err(|_| "Failed to fetch state events locally".to_owned())? + .into_iter() + .map(|pdu| { + ( ( - ( - pdu.kind.clone(), - pdu.state_key - .clone() - .expect("events from state_full_ids are state events"), - ), - pdu, - ) - }) - .collect(), - ); + pdu.kind.clone(), + pdu.state_key + .clone() + .expect("events from state_full_ids are state events"), + ), + pdu, + ) + }) + .collect::>(); + + let prev_pdu = db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { + "Could not find prev event, but we know the state.".to_owned() + })?; + + if let Some(state_key) = &prev_pdu.state_key { + state.insert((prev_pdu.kind.clone(), state_key.clone()), prev_pdu); + } + + state_at_incoming_event = Some(state); } // TODO: set incoming_auth_events? } @@ -2461,7 +2458,8 @@ pub async fn get_keys_route( &body.device_keys, |u| Some(u.server_name()) == body.sender_servername.as_deref(), &db, - )?; + ) + .await?; db.flush().await?; @@ -2486,7 +2484,7 @@ pub async fn claim_keys_route( return Err(Error::bad_config("Federation is disabled.")); } - let result = claim_keys_helper(&body.one_time_keys, &db)?; + let result = claim_keys_helper(&body.one_time_keys, &db).await?; db.flush().await?;