diff --git a/matrix_sdk_crypto/src/session_manager/group_sessions.rs b/matrix_sdk_crypto/src/session_manager/group_sessions.rs index f033a99a..c6b852db 100644 --- a/matrix_sdk_crypto/src/session_manager/group_sessions.rs +++ b/matrix_sdk_crypto/src/session_manager/group_sessions.rs @@ -192,9 +192,8 @@ impl GroupSessionManager { /// Encrypt the given content for the given devices and create a to-device /// requests that sends the encrypted content to them. async fn encrypt_session_for( - &self, content: Value, - devices: &[Device], + devices: Vec, ) -> OlmResult<(Uuid, ToDeviceRequest, Vec)> { let mut messages = BTreeMap::new(); let mut changed_sessions = Vec::new(); @@ -356,6 +355,24 @@ impl GroupSessionManager { Ok((should_rotate, devices)) } + pub async fn encrypt_request( + chunk: Vec, + content: Value, + outbound: OutboundGroupSession, + message_index: u32, + being_shared: Arc>, + ) -> OlmResult> { + let (id, request, used_sessions) = + Self::encrypt_session_for(content.clone(), chunk).await?; + + if !request.messages.is_empty() { + outbound.add_request(id, request.into(), message_index); + being_shared.insert(id, outbound.clone()); + } + + Ok(used_sessions) + } + /// Get to-device requests to share a group session with users in a room. /// /// # Arguments @@ -449,18 +466,22 @@ impl GroupSessionManager { ); } - for device_map_chunk in devices.chunks(Self::MAX_TO_DEVICE_MESSAGES) { - let (id, request, used_sessions) = self - .encrypt_session_for(key_content.clone(), device_map_chunk) - .await?; + let tasks: Vec<_> = devices + .chunks(Self::MAX_TO_DEVICE_MESSAGES) + .map(|chunk| { + tokio::spawn(Self::encrypt_request( + chunk.to_vec(), + key_content.clone(), + outbound.clone(), + message_index, + self.outbound_sessions_being_shared.clone(), + )) + }) + .collect(); - if !request.messages.is_empty() { - outbound.add_request(id, request.into(), message_index); - self.outbound_sessions_being_shared - .insert(id, outbound.clone()); - } - - changes.sessions.extend(used_sessions); + for result in join_all(tasks).await { + let used_sessions: OlmResult> = result.expect("Encryption task paniced"); + changes.sessions.extend(used_sessions?); } let requests = outbound.pending_requests();