crypto: Encrypt the share group session requests in parallel.

master
Damir Jelić 2021-03-09 15:09:59 +01:00
parent a8bc619dca
commit 560aa5b0a9
1 changed files with 34 additions and 13 deletions

View File

@ -192,9 +192,8 @@ impl GroupSessionManager {
/// Encrypt the given content for the given devices and create a to-device /// Encrypt the given content for the given devices and create a to-device
/// requests that sends the encrypted content to them. /// requests that sends the encrypted content to them.
async fn encrypt_session_for( async fn encrypt_session_for(
&self,
content: Value, content: Value,
devices: &[Device], devices: Vec<Device>,
) -> OlmResult<(Uuid, ToDeviceRequest, Vec<Session>)> { ) -> OlmResult<(Uuid, ToDeviceRequest, Vec<Session>)> {
let mut messages = BTreeMap::new(); let mut messages = BTreeMap::new();
let mut changed_sessions = Vec::new(); let mut changed_sessions = Vec::new();
@ -356,6 +355,24 @@ impl GroupSessionManager {
Ok((should_rotate, devices)) Ok((should_rotate, devices))
} }
pub async fn encrypt_request(
chunk: Vec<Device>,
content: Value,
outbound: OutboundGroupSession,
message_index: u32,
being_shared: Arc<DashMap<Uuid, OutboundGroupSession>>,
) -> OlmResult<Vec<Session>> {
let (id, request, used_sessions) =
Self::encrypt_session_for(content.clone(), chunk).await?;
if !request.messages.is_empty() {
outbound.add_request(id, request.into(), message_index);
being_shared.insert(id, outbound.clone());
}
Ok(used_sessions)
}
/// Get to-device requests to share a group session with users in a room. /// Get to-device requests to share a group session with users in a room.
/// ///
/// # Arguments /// # Arguments
@ -449,18 +466,22 @@ impl GroupSessionManager {
); );
} }
for device_map_chunk in devices.chunks(Self::MAX_TO_DEVICE_MESSAGES) { let tasks: Vec<_> = devices
let (id, request, used_sessions) = self .chunks(Self::MAX_TO_DEVICE_MESSAGES)
.encrypt_session_for(key_content.clone(), device_map_chunk) .map(|chunk| {
.await?; tokio::spawn(Self::encrypt_request(
chunk.to_vec(),
key_content.clone(),
outbound.clone(),
message_index,
self.outbound_sessions_being_shared.clone(),
))
})
.collect();
if !request.messages.is_empty() { for result in join_all(tasks).await {
outbound.add_request(id, request.into(), message_index); let used_sessions: OlmResult<Vec<Session>> = result.expect("Encryption task paniced");
self.outbound_sessions_being_shared changes.sessions.extend(used_sessions?);
.insert(id, outbound.clone());
}
changes.sessions.extend(used_sessions);
} }
let requests = outbound.pending_requests(); let requests = outbound.pending_requests();