diff --git a/matrix_sdk_crypto/src/identities/manager.rs b/matrix_sdk_crypto/src/identities/manager.rs index 12f955a1..080a6432 100644 --- a/matrix_sdk_crypto/src/identities/manager.rs +++ b/matrix_sdk_crypto/src/identities/manager.rs @@ -23,7 +23,7 @@ use tracing::{trace, warn}; use matrix_sdk_common::{ api::r0::keys::get_keys::Response as KeysQueryResponse, encryption::DeviceKeys, - identifiers::{DeviceId, DeviceIdBox, UserId}, + identifiers::{DeviceIdBox, UserId}, }; use crate::{ @@ -64,10 +64,6 @@ impl IdentityManager { &self.user_id } - fn device_id(&self) -> &DeviceId { - &self.device_id - } - /// Receive a successful keys query response. /// /// Returns a list of devices newly discovered devices and devices that @@ -82,7 +78,7 @@ impl IdentityManager { response: &KeysQueryResponse, ) -> OlmResult<(DeviceChanges, IdentityChanges)> { let changed_devices = self - .handle_devices_from_key_query(&response.device_keys) + .handle_devices_from_key_query(response.device_keys.clone()) .await?; let changed_identities = self.handle_cross_singing_keys(response).await?; @@ -140,6 +136,63 @@ impl IdentityManager { } } + async fn update_user_devices( + store: Store, + own_user_id: Arc, + own_device_id: Arc, + user_id: UserId, + device_map: BTreeMap, + ) -> StoreResult { + let mut changes = DeviceChanges::default(); + + let current_devices: HashSet = device_map.keys().cloned().collect(); + + let tasks = device_map + .into_iter() + .filter_map(|(device_id, device_keys)| { + // We don't need our own device in the device store. + if user_id == *own_user_id && device_id == *own_device_id { + None + } else if user_id != device_keys.user_id || device_id != device_keys.device_id { + warn!( + "Mismatch in device keys payload of device {}|{} from user {}|{}", + device_id, device_keys.device_id, user_id, device_keys.user_id + ); + None + } else { + Some(tokio::spawn(Self::update_or_create_device( + store.clone(), + device_keys, + ))) + } + }); + + let results = join_all(tasks).await; + + for device in results { + match device.expect("Creating or updating a device panicked")? { + DeviceChange::New(d) => changes.new.push(d), + DeviceChange::Updated(d) => changes.changed.push(d), + DeviceChange::None => (), + } + } + + let current_devices: HashSet<&DeviceIdBox> = current_devices.iter().collect(); + let stored_devices = store.get_readonly_devices(&user_id).await?; + let stored_devices_set: HashSet<&DeviceIdBox> = stored_devices.keys().collect(); + + let deleted_devices_set = stored_devices_set.difference(¤t_devices); + + for device_id in deleted_devices_set { + if let Some(device) = stored_devices.get(*device_id) { + device.mark_as_deleted(); + changes.deleted.push(device.clone()); + } + } + + Ok(changes) + } + /// Handle the device keys part of a key query response. /// /// # Arguments @@ -151,51 +204,27 @@ impl IdentityManager { /// they are new, one of their properties has changed or they got deleted. async fn handle_devices_from_key_query( &self, - device_keys_map: &BTreeMap>, + device_keys_map: BTreeMap>, ) -> StoreResult { let mut changes = DeviceChanges::default(); - for (user_id, device_map) in device_keys_map { - let tasks = device_map.iter().filter_map(|(device_id, device_keys)| { - // We don't need our own device in the device store. - if user_id == self.user_id() && &**device_id == self.device_id() { - None - } else if user_id != &device_keys.user_id || device_id != &device_keys.device_id { - warn!( - "Mismatch in device keys payload of device {}|{} from user {}|{}", - device_id, device_keys.device_id, user_id, device_keys.user_id - ); - None - } else { - Some(tokio::spawn(Self::update_or_create_device( - self.store.clone(), - device_keys.clone(), - ))) - } + let tasks = device_keys_map + .into_iter() + .map(|(user_id, device_keys_map)| { + tokio::spawn(Self::update_user_devices( + self.store.clone(), + self.user_id.clone(), + self.device_id.clone(), + user_id, + device_keys_map, + )) }); - let results = join_all(tasks).await; + let results = join_all(tasks).await; - for device in results { - match device.expect("Creating or updating a device panicked")? { - DeviceChange::New(d) => changes.new.push(d), - DeviceChange::Updated(d) => changes.changed.push(d), - DeviceChange::None => (), - } - } - - let current_devices: HashSet<&DeviceIdBox> = device_map.keys().collect(); - let stored_devices = self.store.get_readonly_devices(&user_id).await?; - let stored_devices_set: HashSet<&DeviceIdBox> = stored_devices.keys().collect(); - - let deleted_devices_set = stored_devices_set.difference(¤t_devices); - - for device_id in deleted_devices_set { - if let Some(device) = stored_devices.get(*device_id) { - device.mark_as_deleted(); - changes.deleted.push(device.clone()); - } - } + for result in results { + let change_fragment = result.expect("Panic while updating user devices")?; + changes.extend(change_fragment); } Ok(changes) diff --git a/matrix_sdk_crypto/src/store/mod.rs b/matrix_sdk_crypto/src/store/mod.rs index b72e5f01..e8e21f3e 100644 --- a/matrix_sdk_crypto/src/store/mod.rs +++ b/matrix_sdk_crypto/src/store/mod.rs @@ -126,6 +126,15 @@ pub struct DeviceChanges { pub deleted: Vec, } +impl DeviceChanges { + /// Merge the given `DeviceChanges` into this instance of `DeviceChanges`. + pub fn extend(&mut self, other: DeviceChanges) { + self.new.extend(other.new); + self.changed.extend(other.changed); + self.deleted.extend(other.deleted); + } +} + impl Store { pub fn new( user_id: Arc,