From a8bc619dca65678d1248dd5c53871238561234d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 9 Mar 2021 14:30:28 +0100 Subject: [PATCH] crypto: Encrypt room keys for a room key share request in parallel --- matrix_sdk_crypto/Cargo.toml | 3 +- .../src/session_manager/group_sessions.rs | 50 +++++++++++++------ 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/matrix_sdk_crypto/Cargo.toml b/matrix_sdk_crypto/Cargo.toml index c72497ab..d560f15b 100644 --- a/matrix_sdk_crypto/Cargo.toml +++ b/matrix_sdk_crypto/Cargo.toml @@ -29,6 +29,8 @@ serde_json = "1.0.61" zeroize = { version = "1.2.0", features = ["zeroize_derive"] } # Misc dependencies +tokio = { version = "1.1.0", default-features = false, features = ["rt", "rt-multi-thread"] } +futures = "0.3.12" sled = { version = "0.34.6", optional = true } thiserror = "1.0.23" tracing = "0.1.22" @@ -44,7 +46,6 @@ byteorder = "1.4.2" [dev-dependencies] tokio = { version = "1.1.0", default-features = false, features = ["rt-multi-thread", "macros"] } -futures = "0.3.12" proptest = "0.10.1" serde_json = "1.0.61" tempfile = "3.2.0" diff --git a/matrix_sdk_crypto/src/session_manager/group_sessions.rs b/matrix_sdk_crypto/src/session_manager/group_sessions.rs index f6b27588..f033a99a 100644 --- a/matrix_sdk_crypto/src/session_manager/group_sessions.rs +++ b/matrix_sdk_crypto/src/session_manager/group_sessions.rs @@ -17,6 +17,10 @@ use std::{ sync::Arc, }; +use futures::future::join_all; + +use tokio; + use dashmap::DashMap; use matrix_sdk_common::{ api::r0::to_device::DeviceIdOrAllDevices, @@ -195,28 +199,46 @@ impl GroupSessionManager { let mut messages = BTreeMap::new(); let mut changed_sessions = Vec::new(); - for device in devices { + let encrypt = |device: Device, content: Value| async move { + let mut message = BTreeMap::new(); + let encrypted = device.encrypt(EventType::RoomKey, content.clone()).await; - let (used_session, encrypted) = match encrypted { - Ok(c) => c, + let used_session = match encrypted { + Ok((session, encrypted)) => { + message + .entry(device.user_id().clone()) + .or_insert_with(BTreeMap::new) + .insert( + DeviceIdOrAllDevices::DeviceId(device.device_id().into()), + serde_json::value::to_raw_value(&encrypted)?, + ); + Some(session) + } // TODO we'll want to create m.room_key.withheld here. Err(OlmError::MissingSession) - | Err(OlmError::EventError(EventError::MissingSenderKey)) => { - continue; - } + | Err(OlmError::EventError(EventError::MissingSenderKey)) => None, Err(e) => return Err(e), }; - changed_sessions.push(used_session); + Ok((used_session, message)) + }; - messages - .entry(device.user_id().clone()) - .or_insert_with(BTreeMap::new) - .insert( - DeviceIdOrAllDevices::DeviceId(device.device_id().into()), - serde_json::value::to_raw_value(&encrypted)?, - ); + let tasks: Vec<_> = devices + .iter() + .map(|d| tokio::spawn(encrypt(d.clone(), content.clone()))) + .collect(); + + let results = join_all(tasks).await; + + for result in results { + let (used_session, message) = result.expect("Encryption task paniced")?; + + if let Some(session) = used_session { + changed_sessions.push(session); + } + + messages.extend(message); } let id = Uuid::new_v4();