diff --git a/matrix_sdk_crypto/benches/crypto_bench.rs b/matrix_sdk_crypto/benches/crypto_bench.rs index 3f61feca..845e959a 100644 --- a/matrix_sdk_crypto/benches/crypto_bench.rs +++ b/matrix_sdk_crypto/benches/crypto_bench.rs @@ -72,13 +72,14 @@ pub fn keys_query(c: &mut Criterion) { ); let dir = tempfile::tempdir().unwrap(); - let machine = block_on(OlmMachine::new_with_default_store( - &alice_id(), - &alice_device_id(), - dir.path(), - None, - )) - .unwrap(); + let machine = runtime + .block_on(OlmMachine::new_with_default_store( + &alice_id(), + &alice_device_id(), + dir.path(), + None, + )) + .unwrap(); group.bench_with_input( BenchmarkId::new("sled store", &name), @@ -169,8 +170,12 @@ pub fn room_key_sharing(c: &mut Criterion) { .fold(0, |acc, d| acc + d.len()); let machine = OlmMachine::new(&alice_id(), &alice_device_id()); - block_on(machine.mark_request_as_sent(&uuid, &keys_query_response)).unwrap(); - block_on(machine.mark_request_as_sent(&uuid, &response)).unwrap(); + runtime + .block_on(machine.mark_request_as_sent(&uuid, &keys_query_response)) + .unwrap(); + runtime + .block_on(machine.mark_request_as_sent(&uuid, &response)) + .unwrap(); let mut group = c.benchmark_group("Room key sharing"); group.throughput(Throughput::Elements(count as u64)); @@ -204,8 +209,12 @@ pub fn room_key_sharing(c: &mut Criterion) { None, )) .unwrap(); - block_on(machine.mark_request_as_sent(&uuid, &keys_query_response)).unwrap(); - block_on(machine.mark_request_as_sent(&uuid, &response)).unwrap(); + runtime + .block_on(machine.mark_request_as_sent(&uuid, &keys_query_response)) + .unwrap(); + runtime + .block_on(machine.mark_request_as_sent(&uuid, &response)) + .unwrap(); group.bench_function(BenchmarkId::new("sled store", &name), |b| { b.to_async(&runtime).iter(|| async { diff --git a/matrix_sdk_crypto/src/identities/manager.rs b/matrix_sdk_crypto/src/identities/manager.rs index bbef7f35..4235a650 100644 --- a/matrix_sdk_crypto/src/identities/manager.rs +++ b/matrix_sdk_crypto/src/identities/manager.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use futures::future::join_all; use std::{ collections::{BTreeMap, HashSet}, convert::TryFrom, sync::Arc, }; -use tracing::{info, trace, warn}; +use tracing::{trace, warn}; use matrix_sdk_common::{ api::r0::keys::get_keys::Response as KeysQueryResponse, @@ -35,6 +36,12 @@ use crate::{ store::{Changes, DeviceChanges, IdentityChanges, Result as StoreResult, Store}, }; +enum DeviceChange { + New(ReadOnlyDevice), + Updated(ReadOnlyDevice), + None, +} + #[derive(Debug, Clone)] pub(crate) struct IdentityManager { user_id: Arc, @@ -90,6 +97,43 @@ impl IdentityManager { Ok((changed_devices, changed_identities)) } + async fn update_or_create_device( + store: Store, + device_keys: DeviceKeys, + ) -> StoreResult { + let old_device = store + .get_readonly_device(&device_keys.user_id, &device_keys.device_id) + .await?; + + if let Some(mut device) = old_device { + if let Err(e) = device.update_device(&device_keys) { + warn!( + "Failed to update the device keys for {} {}: {:?}", + device.user_id(), + device.device_id(), + e + ); + Ok(DeviceChange::None) + } else { + Ok(DeviceChange::Updated(device)) + } + } else { + match ReadOnlyDevice::try_from(&device_keys) { + Ok(d) => { + trace!("Adding a new device to the device store {:?}", d); + Ok(DeviceChange::New(d)) + } + Err(e) => { + warn!( + "Failed to create a new device for {} {}: {:?}", + device_keys.user_id, device_keys.device_id, e + ); + Ok(DeviceChange::None) + } + } + } + } + /// Handle the device keys part of a key query response. /// /// # Arguments @@ -107,48 +151,35 @@ impl IdentityManager { for (user_id, device_map) in device_keys_map { // TODO move this out into the handle keys query response method - // since we might fail handle the new device at any point here or + // since we might fail to handle the new device at any point here or // when updating the user identities. self.store.update_tracked_user(user_id, false).await?; - for (device_id, device_keys) in device_map.iter() { + let tasks = device_map.iter().filter_map(|(device_id, device_keys)| { // We don't need our own device in the device store. if user_id == self.user_id() && &**device_id == self.device_id() { - continue; - } - - if user_id != &device_keys.user_id || device_id != &device_keys.device_id { + None + } else if user_id != &device_keys.user_id || device_id != &device_keys.device_id { warn!( "Mismatch in device keys payload of device {}|{} from user {}|{}", device_id, device_keys.device_id, user_id, device_keys.user_id ); - continue; - } - - let device = self.store.get_readonly_device(&user_id, device_id).await?; - - if let Some(mut device) = device { - if let Err(e) = device.update_device(device_keys) { - warn!( - "Failed to update the device keys for {} {}: {:?}", - user_id, device_id, e - ); - continue; - } - changes.changed.push(device); + None } else { - let device = match ReadOnlyDevice::try_from(device_keys) { - Ok(d) => d, - Err(e) => { - warn!( - "Failed to create a new device for {} {}: {:?}", - user_id, device_id, e - ); - continue; - } - }; - info!("Adding a new device to the device store {:?}", device); - changes.new.push(device); + Some(tokio::spawn(Self::update_or_create_device( + self.store.clone(), + device_keys.clone(), + ))) + } + }); + + let results = join_all(tasks).await; + + for device in results { + match device.expect("Creating or updating a device panicked")? { + DeviceChange::New(d) => changes.new.push(d), + DeviceChange::Updated(d) => changes.changed.push(d), + DeviceChange::None => (), } }