crypto: Encrypt room keys for a room key share request in parallel

This commit is contained in:
Damir Jelić 2021-03-09 14:30:28 +01:00
parent 91c326e970
commit a8bc619dca
2 changed files with 38 additions and 15 deletions

View file

@ -29,6 +29,8 @@ serde_json = "1.0.61"
zeroize = { version = "1.2.0", features = ["zeroize_derive"] }
# Misc dependencies
tokio = { version = "1.1.0", default-features = false, features = ["rt", "rt-multi-thread"] }
futures = "0.3.12"
sled = { version = "0.34.6", optional = true }
thiserror = "1.0.23"
tracing = "0.1.22"
@ -44,7 +46,6 @@ byteorder = "1.4.2"
[dev-dependencies]
tokio = { version = "1.1.0", default-features = false, features = ["rt-multi-thread", "macros"] }
futures = "0.3.12"
proptest = "0.10.1"
serde_json = "1.0.61"
tempfile = "3.2.0"

View file

@ -17,6 +17,10 @@ use std::{
sync::Arc,
};
use futures::future::join_all;
use tokio;
use dashmap::DashMap;
use matrix_sdk_common::{
api::r0::to_device::DeviceIdOrAllDevices,
@ -195,28 +199,46 @@ impl GroupSessionManager {
let mut messages = BTreeMap::new();
let mut changed_sessions = Vec::new();
for device in devices {
let encrypt = |device: Device, content: Value| async move {
let mut message = BTreeMap::new();
let encrypted = device.encrypt(EventType::RoomKey, content.clone()).await;
let (used_session, encrypted) = match encrypted {
Ok(c) => c,
let used_session = match encrypted {
Ok((session, encrypted)) => {
message
.entry(device.user_id().clone())
.or_insert_with(BTreeMap::new)
.insert(
DeviceIdOrAllDevices::DeviceId(device.device_id().into()),
serde_json::value::to_raw_value(&encrypted)?,
);
Some(session)
}
// TODO we'll want to create m.room_key.withheld here.
Err(OlmError::MissingSession)
| Err(OlmError::EventError(EventError::MissingSenderKey)) => {
continue;
}
| Err(OlmError::EventError(EventError::MissingSenderKey)) => None,
Err(e) => return Err(e),
};
changed_sessions.push(used_session);
Ok((used_session, message))
};
messages
.entry(device.user_id().clone())
.or_insert_with(BTreeMap::new)
.insert(
DeviceIdOrAllDevices::DeviceId(device.device_id().into()),
serde_json::value::to_raw_value(&encrypted)?,
);
let tasks: Vec<_> = devices
.iter()
.map(|d| tokio::spawn(encrypt(d.clone(), content.clone())))
.collect();
let results = join_all(tasks).await;
for result in results {
let (used_session, message) = result.expect("Encryption task paniced")?;
if let Some(session) = used_session {
changed_sessions.push(session);
}
messages.extend(message);
}
let id = Uuid::new_v4();