Merge branch 'e2ee-fed' into 'master'
Initial E2EE over federation support See merge request famedly/conduit!135
This commit is contained in:
		
						commit
						f43fd6a475
					
				
					 8 changed files with 247 additions and 77 deletions
				
			
		|  | @ -38,8 +38,8 @@ are not aware of such a bug today, but we would like to do more testing. | ||||||
| There are still a few important features missing: | There are still a few important features missing: | ||||||
| 
 | 
 | ||||||
| - Database stability (currently you might have to do manual upgrades or even wipe the db for new versions) | - Database stability (currently you might have to do manual upgrades or even wipe the db for new versions) | ||||||
| - End-to-end encrypted chats over federation | - Edge cases for end-to-end encryption over federation | ||||||
| - Typing, presence, read receipts etc. over federation | - Typing and presence over federation | ||||||
| - Lots of testing | - Lots of testing | ||||||
| 
 | 
 | ||||||
| Check out the [Conduit 1.0 Release Milestone](https://gitlab.com/famedly/conduit/-/milestones/3). | Check out the [Conduit 1.0 Release Milestone](https://gitlab.com/famedly/conduit/-/milestones/3). | ||||||
|  |  | ||||||
|  | @ -1,19 +1,23 @@ | ||||||
| use super::SESSION_ID_LENGTH; | use super::SESSION_ID_LENGTH; | ||||||
| use crate::{database::DatabaseGuard, utils, ConduitResult, Database, Error, Result, Ruma}; | use crate::{database::DatabaseGuard, utils, ConduitResult, Database, Error, Result, Ruma}; | ||||||
| use ruma::{ | use ruma::{ | ||||||
|     api::client::{ |     api::{ | ||||||
|         error::ErrorKind, |         client::{ | ||||||
|         r0::{ |             error::ErrorKind, | ||||||
|             keys::{ |             r0::{ | ||||||
|                 claim_keys, get_key_changes, get_keys, upload_keys, upload_signatures, |                 keys::{ | ||||||
|                 upload_signing_keys, |                     claim_keys, get_key_changes, get_keys, upload_keys, upload_signatures, | ||||||
|  |                     upload_signing_keys, | ||||||
|  |                 }, | ||||||
|  |                 uiaa::{AuthFlow, UiaaInfo}, | ||||||
|             }, |             }, | ||||||
|             uiaa::{AuthFlow, UiaaInfo}, |  | ||||||
|         }, |         }, | ||||||
|  |         federation, | ||||||
|     }, |     }, | ||||||
|     encryption::UnsignedDeviceInfo, |     encryption::UnsignedDeviceInfo, | ||||||
|     DeviceId, DeviceKeyAlgorithm, UserId, |     DeviceId, DeviceKeyAlgorithm, UserId, | ||||||
| }; | }; | ||||||
|  | use serde_json::json; | ||||||
| use std::collections::{BTreeMap, HashSet}; | use std::collections::{BTreeMap, HashSet}; | ||||||
| 
 | 
 | ||||||
| #[cfg(feature = "conduit_bin")] | #[cfg(feature = "conduit_bin")] | ||||||
|  | @ -84,7 +88,8 @@ pub async fn get_keys_route( | ||||||
|         &body.device_keys, |         &body.device_keys, | ||||||
|         |u| u == sender_user, |         |u| u == sender_user, | ||||||
|         &db, |         &db, | ||||||
|     )?; |     ) | ||||||
|  |     .await?; | ||||||
| 
 | 
 | ||||||
|     Ok(response.into()) |     Ok(response.into()) | ||||||
| } | } | ||||||
|  | @ -98,7 +103,7 @@ pub async fn claim_keys_route( | ||||||
|     db: DatabaseGuard, |     db: DatabaseGuard, | ||||||
|     body: Ruma<claim_keys::Request>, |     body: Ruma<claim_keys::Request>, | ||||||
| ) -> ConduitResult<claim_keys::Response> { | ) -> ConduitResult<claim_keys::Response> { | ||||||
|     let response = claim_keys_helper(&body.one_time_keys, &db)?; |     let response = claim_keys_helper(&body.one_time_keys, &db).await?; | ||||||
| 
 | 
 | ||||||
|     db.flush().await?; |     db.flush().await?; | ||||||
| 
 | 
 | ||||||
|  | @ -278,7 +283,7 @@ pub async fn get_key_changes_route( | ||||||
|     .into()) |     .into()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub fn get_keys_helper<F: Fn(&UserId) -> bool>( | pub async fn get_keys_helper<F: Fn(&UserId) -> bool>( | ||||||
|     sender_user: Option<&UserId>, |     sender_user: Option<&UserId>, | ||||||
|     device_keys_input: &BTreeMap<UserId, Vec<Box<DeviceId>>>, |     device_keys_input: &BTreeMap<UserId, Vec<Box<DeviceId>>>, | ||||||
|     allowed_signatures: F, |     allowed_signatures: F, | ||||||
|  | @ -289,7 +294,17 @@ pub fn get_keys_helper<F: Fn(&UserId) -> bool>( | ||||||
|     let mut user_signing_keys = BTreeMap::new(); |     let mut user_signing_keys = BTreeMap::new(); | ||||||
|     let mut device_keys = BTreeMap::new(); |     let mut device_keys = BTreeMap::new(); | ||||||
| 
 | 
 | ||||||
|  |     let mut get_over_federation = BTreeMap::new(); | ||||||
|  | 
 | ||||||
|     for (user_id, device_ids) in device_keys_input { |     for (user_id, device_ids) in device_keys_input { | ||||||
|  |         if user_id.server_name() != db.globals.server_name() { | ||||||
|  |             get_over_federation | ||||||
|  |                 .entry(user_id.server_name()) | ||||||
|  |                 .or_insert_with(Vec::new) | ||||||
|  |                 .push((user_id, device_ids)); | ||||||
|  |             continue; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         if device_ids.is_empty() { |         if device_ids.is_empty() { | ||||||
|             let mut container = BTreeMap::new(); |             let mut container = BTreeMap::new(); | ||||||
|             for device_id in db.users.all_device_ids(user_id) { |             for device_id in db.users.all_device_ids(user_id) { | ||||||
|  | @ -347,21 +362,60 @@ pub fn get_keys_helper<F: Fn(&UserId) -> bool>( | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     let mut failures = BTreeMap::new(); | ||||||
|  | 
 | ||||||
|  |     for (server, vec) in get_over_federation { | ||||||
|  |         let mut device_keys_input_fed = BTreeMap::new(); | ||||||
|  |         for (user_id, keys) in vec { | ||||||
|  |             device_keys_input_fed.insert(user_id.clone(), keys.clone()); | ||||||
|  |         } | ||||||
|  |         match db | ||||||
|  |             .sending | ||||||
|  |             .send_federation_request( | ||||||
|  |                 &db.globals, | ||||||
|  |                 server, | ||||||
|  |                 federation::keys::get_keys::v1::Request { | ||||||
|  |                     device_keys: device_keys_input_fed, | ||||||
|  |                 }, | ||||||
|  |             ) | ||||||
|  |             .await | ||||||
|  |         { | ||||||
|  |             Ok(response) => { | ||||||
|  |                 master_keys.extend(response.master_keys); | ||||||
|  |                 self_signing_keys.extend(response.self_signing_keys); | ||||||
|  |                 device_keys.extend(response.device_keys); | ||||||
|  |             } | ||||||
|  |             Err(_e) => { | ||||||
|  |                 failures.insert(server.to_string(), json!({})); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     Ok(get_keys::Response { |     Ok(get_keys::Response { | ||||||
|         master_keys, |         master_keys, | ||||||
|         self_signing_keys, |         self_signing_keys, | ||||||
|         user_signing_keys, |         user_signing_keys, | ||||||
|         device_keys, |         device_keys, | ||||||
|         failures: BTreeMap::new(), |         failures, | ||||||
|     }) |     }) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub fn claim_keys_helper( | pub async fn claim_keys_helper( | ||||||
|     one_time_keys_input: &BTreeMap<UserId, BTreeMap<Box<DeviceId>, DeviceKeyAlgorithm>>, |     one_time_keys_input: &BTreeMap<UserId, BTreeMap<Box<DeviceId>, DeviceKeyAlgorithm>>, | ||||||
|     db: &Database, |     db: &Database, | ||||||
| ) -> Result<claim_keys::Response> { | ) -> Result<claim_keys::Response> { | ||||||
|     let mut one_time_keys = BTreeMap::new(); |     let mut one_time_keys = BTreeMap::new(); | ||||||
|  | 
 | ||||||
|  |     let mut get_over_federation = BTreeMap::new(); | ||||||
|  | 
 | ||||||
|     for (user_id, map) in one_time_keys_input { |     for (user_id, map) in one_time_keys_input { | ||||||
|  |         if user_id.server_name() != db.globals.server_name() { | ||||||
|  |             get_over_federation | ||||||
|  |                 .entry(user_id.server_name()) | ||||||
|  |                 .or_insert_with(Vec::new) | ||||||
|  |                 .push((user_id, map)); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         let mut container = BTreeMap::new(); |         let mut container = BTreeMap::new(); | ||||||
|         for (device_id, key_algorithm) in map { |         for (device_id, key_algorithm) in map { | ||||||
|             if let Some(one_time_keys) = |             if let Some(one_time_keys) = | ||||||
|  | @ -376,6 +430,26 @@ pub fn claim_keys_helper( | ||||||
|         one_time_keys.insert(user_id.clone(), container); |         one_time_keys.insert(user_id.clone(), container); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     for (server, vec) in get_over_federation { | ||||||
|  |         let mut one_time_keys_input_fed = BTreeMap::new(); | ||||||
|  |         for (user_id, keys) in vec { | ||||||
|  |             one_time_keys_input_fed.insert(user_id.clone(), keys.clone()); | ||||||
|  |         } | ||||||
|  |         // Ignore failures
 | ||||||
|  |         let keys = db | ||||||
|  |             .sending | ||||||
|  |             .send_federation_request( | ||||||
|  |                 &db.globals, | ||||||
|  |                 server, | ||||||
|  |                 federation::keys::claim_keys::v1::Request { | ||||||
|  |                     one_time_keys: one_time_keys_input_fed, | ||||||
|  |                 }, | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         one_time_keys.extend(keys.one_time_keys); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     Ok(claim_keys::Response { |     Ok(claim_keys::Response { | ||||||
|         failures: BTreeMap::new(), |         failures: BTreeMap::new(), | ||||||
|         one_time_keys, |         one_time_keys, | ||||||
|  |  | ||||||
|  | @ -836,7 +836,7 @@ pub async fn invite_helper<'a>( | ||||||
|             is_direct: Some(is_direct), |             is_direct: Some(is_direct), | ||||||
|             membership: MembershipState::Invite, |             membership: MembershipState::Invite, | ||||||
|             third_party_invite: None, |             third_party_invite: None, | ||||||
|             blurhash: db.users.blurhash(&sender_user)?, |             blurhash: None, | ||||||
|         }) |         }) | ||||||
|         .expect("member event is valid value"); |         .expect("member event is valid value"); | ||||||
| 
 | 
 | ||||||
|  | @ -1012,7 +1012,7 @@ pub async fn invite_helper<'a>( | ||||||
|                 avatar_url: db.users.avatar_url(&user_id)?, |                 avatar_url: db.users.avatar_url(&user_id)?, | ||||||
|                 is_direct: Some(is_direct), |                 is_direct: Some(is_direct), | ||||||
|                 third_party_invite: None, |                 third_party_invite: None, | ||||||
|                 blurhash: db.users.blurhash(&sender_user)?, |                 blurhash: db.users.blurhash(&user_id)?, | ||||||
|             }) |             }) | ||||||
|             .expect("event is valid, we just created it"), |             .expect("event is valid, we just created it"), | ||||||
|             unsigned: None, |             unsigned: None, | ||||||
|  |  | ||||||
|  | @ -1,10 +1,13 @@ | ||||||
| use crate::{database::DatabaseGuard, pdu::PduBuilder, utils, ConduitResult, Error, Ruma}; | use crate::{database::DatabaseGuard, pdu::PduBuilder, utils, ConduitResult, Error, Ruma}; | ||||||
| use ruma::{ | use ruma::{ | ||||||
|     api::client::{ |     api::{ | ||||||
|         error::ErrorKind, |         client::{ | ||||||
|         r0::profile::{ |             error::ErrorKind, | ||||||
|             get_avatar_url, get_display_name, get_profile, set_avatar_url, set_display_name, |             r0::profile::{ | ||||||
|  |                 get_avatar_url, get_display_name, get_profile, set_avatar_url, set_display_name, | ||||||
|  |             }, | ||||||
|         }, |         }, | ||||||
|  |         federation::{self, query::get_profile_information::v1::ProfileField}, | ||||||
|     }, |     }, | ||||||
|     events::EventType, |     events::EventType, | ||||||
|     serde::Raw, |     serde::Raw, | ||||||
|  | @ -120,6 +123,25 @@ pub async fn get_displayname_route( | ||||||
|     db: DatabaseGuard, |     db: DatabaseGuard, | ||||||
|     body: Ruma<get_display_name::Request<'_>>, |     body: Ruma<get_display_name::Request<'_>>, | ||||||
| ) -> ConduitResult<get_display_name::Response> { | ) -> ConduitResult<get_display_name::Response> { | ||||||
|  |     if body.user_id.server_name() != db.globals.server_name() { | ||||||
|  |         let response = db | ||||||
|  |             .sending | ||||||
|  |             .send_federation_request( | ||||||
|  |                 &db.globals, | ||||||
|  |                 body.user_id.server_name(), | ||||||
|  |                 federation::query::get_profile_information::v1::Request { | ||||||
|  |                     user_id: &body.user_id, | ||||||
|  |                     field: Some(&ProfileField::DisplayName), | ||||||
|  |                 }, | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         return Ok(get_display_name::Response { | ||||||
|  |             displayname: response.displayname, | ||||||
|  |         } | ||||||
|  |         .into()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     Ok(get_display_name::Response { |     Ok(get_display_name::Response { | ||||||
|         displayname: db.users.displayname(&body.user_id)?, |         displayname: db.users.displayname(&body.user_id)?, | ||||||
|     } |     } | ||||||
|  | @ -234,6 +256,26 @@ pub async fn get_avatar_url_route( | ||||||
|     db: DatabaseGuard, |     db: DatabaseGuard, | ||||||
|     body: Ruma<get_avatar_url::Request<'_>>, |     body: Ruma<get_avatar_url::Request<'_>>, | ||||||
| ) -> ConduitResult<get_avatar_url::Response> { | ) -> ConduitResult<get_avatar_url::Response> { | ||||||
|  |     if body.user_id.server_name() != db.globals.server_name() { | ||||||
|  |         let response = db | ||||||
|  |             .sending | ||||||
|  |             .send_federation_request( | ||||||
|  |                 &db.globals, | ||||||
|  |                 body.user_id.server_name(), | ||||||
|  |                 federation::query::get_profile_information::v1::Request { | ||||||
|  |                     user_id: &body.user_id, | ||||||
|  |                     field: Some(&ProfileField::AvatarUrl), | ||||||
|  |                 }, | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         return Ok(get_avatar_url::Response { | ||||||
|  |             avatar_url: response.avatar_url, | ||||||
|  |             blurhash: response.blurhash, | ||||||
|  |         } | ||||||
|  |         .into()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     Ok(get_avatar_url::Response { |     Ok(get_avatar_url::Response { | ||||||
|         avatar_url: db.users.avatar_url(&body.user_id)?, |         avatar_url: db.users.avatar_url(&body.user_id)?, | ||||||
|         blurhash: db.users.blurhash(&body.user_id)?, |         blurhash: db.users.blurhash(&body.user_id)?, | ||||||
|  | @ -250,6 +292,27 @@ pub async fn get_profile_route( | ||||||
|     db: DatabaseGuard, |     db: DatabaseGuard, | ||||||
|     body: Ruma<get_profile::Request<'_>>, |     body: Ruma<get_profile::Request<'_>>, | ||||||
| ) -> ConduitResult<get_profile::Response> { | ) -> ConduitResult<get_profile::Response> { | ||||||
|  |     if body.user_id.server_name() != db.globals.server_name() { | ||||||
|  |         let response = db | ||||||
|  |             .sending | ||||||
|  |             .send_federation_request( | ||||||
|  |                 &db.globals, | ||||||
|  |                 body.user_id.server_name(), | ||||||
|  |                 federation::query::get_profile_information::v1::Request { | ||||||
|  |                     user_id: &body.user_id, | ||||||
|  |                     field: None, | ||||||
|  |                 }, | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         return Ok(get_profile::Response { | ||||||
|  |             displayname: response.displayname, | ||||||
|  |             avatar_url: response.avatar_url, | ||||||
|  |             blurhash: response.blurhash, | ||||||
|  |         } | ||||||
|  |         .into()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     if !db.users.exists(&body.user_id)? { |     if !db.users.exists(&body.user_id)? { | ||||||
|         // Return 404 if this user doesn't exist
 |         // Return 404 if this user doesn't exist
 | ||||||
|         return Err(Error::BadRequest( |         return Err(Error::BadRequest( | ||||||
|  |  | ||||||
|  | @ -421,7 +421,10 @@ async fn sync_helper( | ||||||
| 
 | 
 | ||||||
|             let send_member_count = state_events |             let send_member_count = state_events | ||||||
|                 .iter() |                 .iter() | ||||||
|                 .any(|event| event.kind == EventType::RoomMember); |                 .any(|event| event.kind == EventType::RoomMember) | ||||||
|  |                 || timeline_pdus.iter().any(|(_, event)| { | ||||||
|  |                     event.state_key.is_some() && event.kind == EventType::RoomMember | ||||||
|  |                 }); | ||||||
| 
 | 
 | ||||||
|             if encrypted_room { |             if encrypted_room { | ||||||
|                 for (user_id, current_member) in db |                 for (user_id, current_member) in db | ||||||
|  |  | ||||||
|  | @ -1,6 +1,12 @@ | ||||||
|  | use std::collections::BTreeMap; | ||||||
|  | 
 | ||||||
| use crate::{database::DatabaseGuard, ConduitResult, Error, Ruma}; | use crate::{database::DatabaseGuard, ConduitResult, Error, Ruma}; | ||||||
| use ruma::{ | use ruma::{ | ||||||
|     api::client::{error::ErrorKind, r0::to_device::send_event_to_device}, |     api::{ | ||||||
|  |         client::{error::ErrorKind, r0::to_device::send_event_to_device}, | ||||||
|  |         federation::{self, transactions::edu::DirectDeviceContent}, | ||||||
|  |     }, | ||||||
|  |     events::EventType, | ||||||
|     to_device::DeviceIdOrAllDevices, |     to_device::DeviceIdOrAllDevices, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
|  | @ -33,6 +39,28 @@ pub async fn send_event_to_device_route( | ||||||
| 
 | 
 | ||||||
|     for (target_user_id, map) in &body.messages { |     for (target_user_id, map) in &body.messages { | ||||||
|         for (target_device_id_maybe, event) in map { |         for (target_device_id_maybe, event) in map { | ||||||
|  |             if target_user_id.server_name() != db.globals.server_name() { | ||||||
|  |                 let mut map = BTreeMap::new(); | ||||||
|  |                 map.insert(target_device_id_maybe.clone(), event.clone()); | ||||||
|  |                 let mut messages = BTreeMap::new(); | ||||||
|  |                 messages.insert(target_user_id.clone(), map); | ||||||
|  | 
 | ||||||
|  |                 db.sending.send_reliable_edu( | ||||||
|  |                     target_user_id.server_name(), | ||||||
|  |                     &serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice( | ||||||
|  |                         DirectDeviceContent { | ||||||
|  |                             sender: sender_user.clone(), | ||||||
|  |                             ev_type: EventType::from(&body.event_type), | ||||||
|  |                             message_id: body.txn_id.clone(), | ||||||
|  |                             messages, | ||||||
|  |                         }, | ||||||
|  |                     )) | ||||||
|  |                     .expect("DirectToDevice EDU can be serialized"), | ||||||
|  |                 )?; | ||||||
|  | 
 | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|             match target_device_id_maybe { |             match target_device_id_maybe { | ||||||
|                 DeviceIdOrAllDevices::DeviceId(target_device_id) => db.users.add_to_device_event( |                 DeviceIdOrAllDevices::DeviceId(target_device_id) => db.users.add_to_device_event( | ||||||
|                     sender_user, |                     sender_user, | ||||||
|  |  | ||||||
|  | @ -164,8 +164,8 @@ impl Sending { | ||||||
|                                 // Find events that have been added since starting the last request
 |                                 // Find events that have been added since starting the last request
 | ||||||
|                                 let new_events = guard.sending.servernamepduids |                                 let new_events = guard.sending.servernamepduids | ||||||
|                                     .scan_prefix(prefix.clone()) |                                     .scan_prefix(prefix.clone()) | ||||||
|                                     .map(|(k, _)| { |                                     .filter_map(|(k, _)| { | ||||||
|                                         SendingEventType::Pdu(k[prefix.len()..].to_vec()) |                                         Self::parse_servercurrentevent(&k).ok().map(|ev| (ev, k)) | ||||||
|                                     }) |                                     }) | ||||||
|                                     .take(30) |                                     .take(30) | ||||||
|                                     .collect::<Vec<_>>(); |                                     .collect::<Vec<_>>(); | ||||||
|  | @ -174,16 +174,9 @@ impl Sending { | ||||||
| 
 | 
 | ||||||
|                                 if !new_events.is_empty() { |                                 if !new_events.is_empty() { | ||||||
|                                     // Insert pdus we found
 |                                     // Insert pdus we found
 | ||||||
|                                     for event in &new_events { |                                     for (_, key) in &new_events { | ||||||
|                                         let mut current_key = prefix.clone(); |                                         guard.sending.servercurrentevents.insert(&key, &[]).unwrap(); | ||||||
|                                         match event { |                                         guard.sending.servernamepduids.remove(&key).unwrap(); | ||||||
|                                             SendingEventType::Pdu(b) | |  | ||||||
|                                             SendingEventType::Edu(b) => { |  | ||||||
|                                                 current_key.extend_from_slice(&b); |  | ||||||
|                                                 guard.sending.servercurrentevents.insert(¤t_key, &[]).unwrap(); |  | ||||||
|                                                 guard.sending.servernamepduids.remove(¤t_key).unwrap(); |  | ||||||
|                                              } |  | ||||||
|                                         } |  | ||||||
|                                     } |                                     } | ||||||
| 
 | 
 | ||||||
|                                     drop(guard); |                                     drop(guard); | ||||||
|  | @ -191,7 +184,7 @@ impl Sending { | ||||||
|                                     futures.push( |                                     futures.push( | ||||||
|                                         Self::handle_events( |                                         Self::handle_events( | ||||||
|                                             outgoing_kind.clone(), |                                             outgoing_kind.clone(), | ||||||
|                                             new_events, |                                             new_events.into_iter().map(|(event, _)| event.1).collect(), | ||||||
|                                             Arc::clone(&db), |                                             Arc::clone(&db), | ||||||
|                                         ) |                                         ) | ||||||
|                                     ); |                                     ); | ||||||
|  | @ -290,7 +283,8 @@ impl Sending { | ||||||
| 
 | 
 | ||||||
|             if let OutgoingKind::Normal(server_name) = outgoing_kind { |             if let OutgoingKind::Normal(server_name) = outgoing_kind { | ||||||
|                 if let Ok((select_edus, last_count)) = Self::select_edus(db, server_name) { |                 if let Ok((select_edus, last_count)) = Self::select_edus(db, server_name) { | ||||||
|                     events.extend_from_slice(&select_edus); |                     events.extend(select_edus.into_iter().map(SendingEventType::Edu)); | ||||||
|  | 
 | ||||||
|                     db.sending |                     db.sending | ||||||
|                         .servername_educount |                         .servername_educount | ||||||
|                         .insert(server_name.as_bytes(), &last_count.to_be_bytes())?; |                         .insert(server_name.as_bytes(), &last_count.to_be_bytes())?; | ||||||
|  | @ -301,7 +295,7 @@ impl Sending { | ||||||
|         Ok(Some(events)) |         Ok(Some(events)) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn select_edus(db: &Database, server: &ServerName) -> Result<(Vec<SendingEventType>, u64)> { |     pub fn select_edus(db: &Database, server: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> { | ||||||
|         // u64: count of last edu
 |         // u64: count of last edu
 | ||||||
|         let since = db |         let since = db | ||||||
|             .sending |             .sending | ||||||
|  | @ -366,9 +360,7 @@ impl Sending { | ||||||
|                     } |                     } | ||||||
|                 }; |                 }; | ||||||
| 
 | 
 | ||||||
|                 events.push(SendingEventType::Edu( |                 events.push(serde_json::to_vec(&federation_event).expect("json can be serialized")); | ||||||
|                     serde_json::to_vec(&federation_event).expect("json can be serialized"), |  | ||||||
|                 )); |  | ||||||
| 
 | 
 | ||||||
|                 if events.len() >= 20 { |                 if events.len() >= 20 { | ||||||
|                     break 'outer; |                     break 'outer; | ||||||
|  | @ -402,6 +394,18 @@ impl Sending { | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     #[tracing::instrument(skip(self))] | ||||||
|  |     pub fn send_reliable_edu(&self, server: &ServerName, serialized: &[u8]) -> Result<()> { | ||||||
|  |         let mut key = server.as_bytes().to_vec(); | ||||||
|  |         key.push(0xff); | ||||||
|  |         key.push(b'*'); | ||||||
|  |         key.extend_from_slice(serialized); | ||||||
|  |         self.servernamepduids.insert(&key, b"")?; | ||||||
|  |         self.sender.unbounded_send(key).unwrap(); | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     #[tracing::instrument(skip(self))] |     #[tracing::instrument(skip(self))] | ||||||
|     pub fn send_pdu_appservice(&self, appservice_id: &str, pdu_id: &[u8]) -> Result<()> { |     pub fn send_pdu_appservice(&self, appservice_id: &str, pdu_id: &[u8]) -> Result<()> { | ||||||
|         let mut key = b"+".to_vec(); |         let mut key = b"+".to_vec(); | ||||||
|  |  | ||||||
|  | @ -1027,42 +1027,39 @@ pub fn handle_incoming_pdu<'a>( | ||||||
|                 .map_err(|_| "Failed talking to db".to_owned())? |                 .map_err(|_| "Failed talking to db".to_owned())? | ||||||
|                 .map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok()) |                 .map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok()) | ||||||
|                 .flatten(); |                 .flatten(); | ||||||
|             if let Some(mut state) = state { |             if let Some(state) = state { | ||||||
|                 if db |                 let mut state = fetch_and_handle_events( | ||||||
|                     .rooms |                     db, | ||||||
|                     .get_pdu(prev_event) |                     origin, | ||||||
|                     .ok() |                     &state.into_iter().collect::<Vec<_>>(), | ||||||
|                     .flatten() |                     &room_id, | ||||||
|                     .ok_or_else(|| "Could not find prev event, but we know the state.".to_owned())? |                     pub_key_map, | ||||||
|                     .state_key |                 ) | ||||||
|                     .is_some() |                 .await | ||||||
|                 { |                 .map_err(|_| "Failed to fetch state events locally".to_owned())? | ||||||
|                     state.insert(prev_event.clone()); |                 .into_iter() | ||||||
|                 } |                 .map(|pdu| { | ||||||
|                 state_at_incoming_event = Some( |                     ( | ||||||
|                     fetch_and_handle_events( |  | ||||||
|                         db, |  | ||||||
|                         origin, |  | ||||||
|                         &state.into_iter().collect::<Vec<_>>(), |  | ||||||
|                         &room_id, |  | ||||||
|                         pub_key_map, |  | ||||||
|                     ) |  | ||||||
|                     .await |  | ||||||
|                     .map_err(|_| "Failed to fetch state events locally".to_owned())? |  | ||||||
|                     .into_iter() |  | ||||||
|                     .map(|pdu| { |  | ||||||
|                         ( |                         ( | ||||||
|                             ( |                             pdu.kind.clone(), | ||||||
|                                 pdu.kind.clone(), |                             pdu.state_key | ||||||
|                                 pdu.state_key |                                 .clone() | ||||||
|                                     .clone() |                                 .expect("events from state_full_ids are state events"), | ||||||
|                                     .expect("events from state_full_ids are state events"), |                         ), | ||||||
|                             ), |                         pdu, | ||||||
|                             pdu, |                     ) | ||||||
|                         ) |                 }) | ||||||
|                     }) |                 .collect::<HashMap<_, _>>(); | ||||||
|                     .collect(), | 
 | ||||||
|                 ); |                 let prev_pdu = db.rooms.get_pdu(prev_event).ok().flatten().ok_or_else(|| { | ||||||
|  |                     "Could not find prev event, but we know the state.".to_owned() | ||||||
|  |                 })?; | ||||||
|  | 
 | ||||||
|  |                 if let Some(state_key) = &prev_pdu.state_key { | ||||||
|  |                     state.insert((prev_pdu.kind.clone(), state_key.clone()), prev_pdu); | ||||||
|  |                 } | ||||||
|  | 
 | ||||||
|  |                 state_at_incoming_event = Some(state); | ||||||
|             } |             } | ||||||
|             // TODO: set incoming_auth_events?
 |             // TODO: set incoming_auth_events?
 | ||||||
|         } |         } | ||||||
|  | @ -2461,7 +2458,8 @@ pub async fn get_keys_route( | ||||||
|         &body.device_keys, |         &body.device_keys, | ||||||
|         |u| Some(u.server_name()) == body.sender_servername.as_deref(), |         |u| Some(u.server_name()) == body.sender_servername.as_deref(), | ||||||
|         &db, |         &db, | ||||||
|     )?; |     ) | ||||||
|  |     .await?; | ||||||
| 
 | 
 | ||||||
|     db.flush().await?; |     db.flush().await?; | ||||||
| 
 | 
 | ||||||
|  | @ -2486,7 +2484,7 @@ pub async fn claim_keys_route( | ||||||
|         return Err(Error::bad_config("Federation is disabled.")); |         return Err(Error::bad_config("Federation is disabled.")); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     let result = claim_keys_helper(&body.one_time_keys, &db)?; |     let result = claim_keys_helper(&body.one_time_keys, &db).await?; | ||||||
| 
 | 
 | ||||||
|     db.flush().await?; |     db.flush().await?; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue