From ff2079da91a501ab3d8ce6fb6d1e594be8af54fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 30 Sep 2020 15:43:25 +0200 Subject: [PATCH] crypto: Move the group session handling logic into separate module. --- matrix_sdk_crypto/src/group_manager.rs | 228 +++++++++++++++++++++ matrix_sdk_crypto/src/identities/device.rs | 1 + matrix_sdk_crypto/src/key_request.rs | 12 +- matrix_sdk_crypto/src/lib.rs | 1 + matrix_sdk_crypto/src/machine.rs | 172 ++++------------ 5 files changed, 275 insertions(+), 139 deletions(-) create mode 100644 matrix_sdk_crypto/src/group_manager.rs diff --git a/matrix_sdk_crypto/src/group_manager.rs b/matrix_sdk_crypto/src/group_manager.rs new file mode 100644 index 00000000..1c258d46 --- /dev/null +++ b/matrix_sdk_crypto/src/group_manager.rs @@ -0,0 +1,228 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::BTreeMap, sync::Arc}; + +use dashmap::DashMap; +use matrix_sdk_common::{ + api::r0::to_device::DeviceIdOrAllDevices, + events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, EventType}, + identifiers::{RoomId, UserId}, + uuid::Uuid, +}; + +use crate::{ + error::{EventError, MegolmResult, OlmResult}, + key_request::Device, + olm::{Account, OutboundGroupSession}, + store::{Result as StoreResult, Store}, + EncryptionSettings, OlmError, ToDeviceRequest, +}; + +#[derive(Clone)] +pub struct GroupSessionManager { + account: Account, + /// Store for the encryption keys. + /// Persists all the encryption keys so a client can resume the session + /// without the need to create new keys. + store: Store, + /// The currently active outbound group sessions. + outbound_group_sessions: Arc>, + outbound_sessions_being_shared: Arc>, +} + +impl GroupSessionManager { + const MAX_TO_DEVICE_MESSAGES: usize = 20; + + pub(crate) fn new(account: Account, store: Store) -> Self { + Self { + account, + store, + outbound_group_sessions: Arc::new(DashMap::new()), + outbound_sessions_being_shared: Arc::new(DashMap::new()), + } + } + + async fn get_user_devices(&self, user_id: &UserId) -> StoreResult> { + let devices = self.store.get_user_devices(user_id).await?; + + let own_identity = self + .store + .get_user_identity(self.account.user_id()) + .await? + .map(|i| i.own().cloned()) + .flatten(); + let device_owner_identity = self.store.get_user_identity(user_id).await.ok().flatten(); + + Ok(devices + .devices() + .map(|d| Device { + inner: d.clone(), + store: self.store.clone(), + own_identity: own_identity.clone(), + device_owner_identity: device_owner_identity.clone(), + }) + .collect()) + } + + pub fn invalidate_group_session(&self, room_id: &RoomId) -> bool { + self.outbound_group_sessions.remove(room_id).is_some() + } + + /// Get an outbound group session for a room, if one exists. + /// + /// # Arguments + /// + /// * `room_id` - The id of the room for which we should get the outbound + /// group session. + pub fn get_outbound_group_session(&self, room_id: &RoomId) -> Option { + #[allow(clippy::map_clone)] + self.outbound_group_sessions.get(room_id).map(|s| s.clone()) + } + + pub async fn encrypt( + &self, + room_id: &RoomId, + content: AnyMessageEventContent, + ) -> MegolmResult { + let session = if let Some(s) = self.get_outbound_group_session(room_id) { + s + } else { + panic!("Session wasn't created nor shared"); + }; + + if session.expired() { + panic!("Session expired"); + } + + Ok(session.encrypt(content).await) + } + + /// Should the client share a group session for the given room. + /// + /// Returns true if a session needs to be shared before room messages can be + /// encrypted, false if one is already shared and ready to encrypt room + /// messages. + /// + /// This should be called every time a new room message wants to be sent out + /// since group sessions can expire at any time. + pub fn should_share_group_session(&self, room_id: &RoomId) -> bool { + let session = self.outbound_group_sessions.get(room_id); + + match session { + Some(s) => !s.shared() || s.expired(), + None => true, + } + } + + /// Create a new outbound group session. + /// + /// This also creates a matching inbound group session and saves that one in + /// the store. + pub async fn create_outbound_group_session( + &self, + room_id: &RoomId, + settings: EncryptionSettings, + users_to_share_with: impl Iterator, + ) -> OlmResult<()> { + let (outbound, inbound) = self + .account + .create_group_session_pair(room_id, settings, users_to_share_with) + .await + .map_err(|_| EventError::UnsupportedAlgorithm)?; + + let _ = self.store.save_inbound_group_sessions(&[inbound]).await?; + + let _ = self + .outbound_group_sessions + .insert(room_id.to_owned(), outbound); + Ok(()) + } + + /// Get to-device requests to share a group session with users in a room. + /// + /// # Arguments + /// + /// `room_id` - The room id of the room where the group session will be + /// used. + /// + /// `users` - The list of users that should receive the group session. + pub async fn share_group_session( + &self, + room_id: &RoomId, + users: impl Iterator, + encryption_settings: impl Into, + ) -> OlmResult> { + self.create_outbound_group_session(room_id, encryption_settings.into(), users) + .await?; + let session = self.outbound_group_sessions.get(room_id).unwrap(); + + if session.shared() { + panic!("Session is already shared"); + } + + // TODO don't mark the session as shared automatically, only when all + // the requests are done, failure to send these requests will likely end + // up in wedged sessions. We'll need to store the requests and let the + // caller mark them as sent using an UUID. + session.mark_as_shared(); + + let mut devices = Vec::new(); + + for user_id in session.users_to_share_with() { + let mut user_devices = self.get_user_devices(&user_id).await?; + devices.extend(user_devices.drain(..).filter(|d| !d.is_blacklisted())) + } + + let mut requests = Vec::new(); + let key_content = session.as_json().await; + + for device_map_chunk in devices.chunks(Self::MAX_TO_DEVICE_MESSAGES) { + let mut messages = BTreeMap::new(); + + for device in device_map_chunk { + let encrypted = device + .encrypt(EventType::RoomKey, key_content.clone()) + .await; + + let encrypted = match encrypted { + Ok(c) => c, + Err(OlmError::MissingSession) + | Err(OlmError::EventError(EventError::MissingSenderKey)) => { + continue; + } + Err(e) => return Err(e), + }; + + messages + .entry(device.user_id().clone()) + .or_insert_with(BTreeMap::new) + .insert( + DeviceIdOrAllDevices::DeviceId(device.device_id().into()), + serde_json::value::to_raw_value(&encrypted)?, + ); + } + + let id = Uuid::new_v4(); + + requests.push(ToDeviceRequest { + event_type: EventType::RoomEncrypted, + txn_id: id, + messages, + }); + } + + Ok(requests) + } +} diff --git a/matrix_sdk_crypto/src/identities/device.rs b/matrix_sdk_crypto/src/identities/device.rs index 419ead58..404a5b2a 100644 --- a/matrix_sdk_crypto/src/identities/device.rs +++ b/matrix_sdk_crypto/src/identities/device.rs @@ -115,6 +115,7 @@ impl Device { /// * `event_type` - The type of the event. /// /// * `content` - The content of the event that should be encrypted. + #[cfg(test)] pub(crate) async fn encrypt( &self, event_type: EventType, diff --git a/matrix_sdk_crypto/src/key_request.rs b/matrix_sdk_crypto/src/key_request.rs index 4372aeb3..787c48e2 100644 --- a/matrix_sdk_crypto/src/key_request.rs +++ b/matrix_sdk_crypto/src/key_request.rs @@ -50,11 +50,11 @@ use crate::{ store::{CryptoStoreError, Store}, }; -struct Device { - inner: ReadOnlyDevice, - store: Store, - own_identity: Option, - device_owner_identity: Option, +pub struct Device { + pub(crate) inner: ReadOnlyDevice, + pub(crate) store: Store, + pub(crate) own_identity: Option, + pub(crate) device_owner_identity: Option, } impl Device { @@ -88,7 +88,7 @@ impl Device { .trust_state(&self.own_identity, &self.device_owner_identity) } - async fn encrypt( + pub async fn encrypt( &self, event_type: EventType, content: Value, diff --git a/matrix_sdk_crypto/src/lib.rs b/matrix_sdk_crypto/src/lib.rs index 14886910..69096b2f 100644 --- a/matrix_sdk_crypto/src/lib.rs +++ b/matrix_sdk_crypto/src/lib.rs @@ -29,6 +29,7 @@ mod error; mod file_encryption; +mod group_manager; mod identities; mod key_request; mod machine; diff --git a/matrix_sdk_crypto/src/machine.rs b/matrix_sdk_crypto/src/machine.rs index f43bd217..3c51c69f 100644 --- a/matrix_sdk_crypto/src/machine.rs +++ b/matrix_sdk_crypto/src/machine.rs @@ -27,12 +27,11 @@ use matrix_sdk_common::{ upload_keys, }, sync::sync_events::Response as SyncResponse, - to_device::DeviceIdOrAllDevices, }, assign, events::{ room::encrypted::EncryptedEventContent, room_key::RoomKeyEventContent, - AnyMessageEventContent, AnySyncRoomEvent, AnyToDeviceEvent, EventType, SyncMessageEvent, + AnyMessageEventContent, AnySyncRoomEvent, AnyToDeviceEvent, SyncMessageEvent, ToDeviceEvent, }, identifiers::{ @@ -44,18 +43,20 @@ use matrix_sdk_common::{ }; #[cfg(feature = "sqlite_cryptostore")] -use super::store::sqlite::SqliteStore; -use super::{ - error::{EventError, MegolmError, MegolmResult, OlmError, OlmResult}, +use crate::store::sqlite::SqliteStore; +use crate::{ + error::{EventError, MegolmError, MegolmResult, OlmResult}, + group_manager::GroupSessionManager, identities::{Device, IdentityManager, ReadOnlyDevice, UserDevices, UserIdentities}, key_request::KeyRequestMachine, olm::{ Account, EncryptionSettings, ExportedRoomKey, GroupSessionKey, IdentityKeys, - InboundGroupSession, OutboundGroupSession, ReadOnlyAccount, + InboundGroupSession, ReadOnlyAccount, }, - requests::{IncomingResponse, OutgoingRequest, ToDeviceRequest}, + requests::{IncomingResponse, OutgoingRequest}, store::{CryptoStore, MemoryStore, Result as StoreResult, Store}, verification::{Sas, VerificationMachine}, + ToDeviceRequest, }; /// State machine implementation of the Olm/Megolm encryption protocol used for @@ -72,8 +73,8 @@ pub struct OlmMachine { /// Persists all the encryption keys so a client can resume the session /// without the need to create new keys. store: Store, - /// The currently active outbound group sessions. - outbound_group_sessions: Arc>, + /// A state machine that keeps track of our outbound group sessions. + group_session_manager: GroupSessionManager, /// A state machine that is responsible to handle and keep track of SAS /// verification flows. verification_machine: VerificationMachine, @@ -96,7 +97,6 @@ impl std::fmt::Debug for OlmMachine { } impl OlmMachine { - const MAX_TO_DEVICE_MESSAGES: usize = 20; const KEY_CLAIM_TIMEOUT: Duration = Duration::from_secs(10); /// Create a new memory based OlmMachine. @@ -137,16 +137,19 @@ impl OlmMachine { ); let identity_manager = IdentityManager::new(user_id.clone(), device_id.clone(), store.clone()); + let account = Account { + inner: account, + store: store.clone(), + }; + + let group_session_manager = GroupSessionManager::new(account.clone(), store.clone()); OlmMachine { user_id, device_id, - account: Account { - inner: account, - store: store.clone(), - }, + account, store, - outbound_group_sessions, + group_session_manager, verification_machine, key_request_machine, identity_manager, @@ -564,50 +567,16 @@ impl OlmMachine { } } - /// Create a new outbound group session. - /// - /// This also creates a matching inbound group session and saves that one in - /// the store. - async fn create_outbound_group_session( - &self, - room_id: &RoomId, - settings: EncryptionSettings, - users_to_share_with: impl Iterator, - ) -> OlmResult<()> { - let (outbound, inbound) = self - .account - .create_group_session_pair(room_id, settings, users_to_share_with) - .await - .map_err(|_| EventError::UnsupportedAlgorithm)?; - - let _ = self.store.save_inbound_group_sessions(&[inbound]).await?; - - let _ = self - .outbound_group_sessions - .insert(room_id.to_owned(), outbound); - Ok(()) - } - #[cfg(test)] pub(crate) async fn create_outbound_group_session_with_defaults( &self, room_id: &RoomId, ) -> OlmResult<()> { - self.create_outbound_group_session(room_id, EncryptionSettings::default(), [].iter()) + self.group_session_manager + .create_outbound_group_session(room_id, EncryptionSettings::default(), [].iter()) .await } - /// Get an outbound group session for a room, if one exists. - /// - /// # Arguments - /// - /// * `room_id` - The id of the room for which we should get the outbound - /// group session. - fn get_outbound_group_session(&self, room_id: &RoomId) -> Option { - #[allow(clippy::map_clone)] - self.outbound_group_sessions.get(room_id).map(|s| s.clone()) - } - /// Encrypt a room message for the given room. /// /// Beware that a group session needs to be shared before this method can be @@ -637,17 +606,7 @@ impl OlmMachine { room_id: &RoomId, content: AnyMessageEventContent, ) -> MegolmResult { - let session = if let Some(s) = self.get_outbound_group_session(room_id) { - s - } else { - panic!("Session wasn't created nor shared"); - }; - - if session.expired() { - panic!("Session expired"); - } - - Ok(session.encrypt(content).await) + self.group_session_manager.encrypt(room_id, content).await } /// Should the client share a group session for the given room. @@ -659,12 +618,8 @@ impl OlmMachine { /// This should be called every time a new room message wants to be sent out /// since group sessions can expire at any time. pub fn should_share_group_session(&self, room_id: &RoomId) -> bool { - let session = self.outbound_group_sessions.get(room_id); - - match session { - Some(s) => !s.shared() || s.expired(), - None => true, - } + self.group_session_manager + .should_share_group_session(room_id) } /// Invalidate the currently active outbound group session for the given @@ -673,7 +628,7 @@ impl OlmMachine { /// Returns true if a session was invalidated, false if there was no session /// to invalidate. pub fn invalidate_group_session(&self, room_id: &RoomId) -> bool { - self.outbound_group_sessions.remove(room_id).is_some() + self.group_session_manager.invalidate_group_session(room_id) } /// Get to-device requests to share a group session with users in a room. @@ -690,67 +645,9 @@ impl OlmMachine { users: impl Iterator, encryption_settings: impl Into, ) -> OlmResult> { - self.create_outbound_group_session(room_id, encryption_settings.into(), users) - .await?; - let session = self.outbound_group_sessions.get(room_id).unwrap(); - - if session.shared() { - panic!("Session is already shared"); - } - - // TODO don't mark the session as shared automatically, only when all - // the requests are done, failure to send these requests will likely end - // up in wedged sessions. We'll need to store the requests and let the - // caller mark them as sent using an UUID. - session.mark_as_shared(); - - let mut devices = Vec::new(); - - for user_id in session.users_to_share_with() { - for device in self.get_user_devices(&user_id).await?.devices() { - if !device.is_blacklisted() { - devices.push(device.clone()); - } - } - } - - let mut requests = Vec::new(); - let key_content = session.as_json().await; - - for device_map_chunk in devices.chunks(OlmMachine::MAX_TO_DEVICE_MESSAGES) { - let mut messages = BTreeMap::new(); - - for device in device_map_chunk { - let encrypted = device - .encrypt(EventType::RoomKey, key_content.clone()) - .await; - - let encrypted = match encrypted { - Ok(c) => c, - Err(OlmError::MissingSession) - | Err(OlmError::EventError(EventError::MissingSenderKey)) => { - continue; - } - Err(e) => return Err(e), - }; - - messages - .entry(device.user_id().clone()) - .or_insert_with(BTreeMap::new) - .insert( - DeviceIdOrAllDevices::DeviceId(device.device_id().into()), - serde_json::value::to_raw_value(&encrypted)?, - ); - } - - requests.push(ToDeviceRequest { - event_type: EventType::RoomEncrypted, - txn_id: Uuid::new_v4(), - messages, - }); - } - - Ok(requests) + self.group_session_manager + .share_group_session(room_id, users, encryption_settings) + .await } /// Receive and properly handle a decrypted to-device event. @@ -1427,11 +1324,17 @@ pub(crate) mod test { .create_outbound_group_session_with_defaults(&room_id) .await .unwrap(); - assert!(machine.outbound_group_sessions.get(&room_id).is_some()); + assert!(machine + .group_session_manager + .get_outbound_group_session(&room_id) + .is_some()); machine.invalidate_group_session(&room_id); - assert!(machine.outbound_group_sessions.get(&room_id).is_none()); + assert!(machine + .group_session_manager + .get_outbound_group_session(&room_id) + .is_none()); } #[tokio::test] @@ -1645,7 +1548,10 @@ pub(crate) mod test { content: to_device_requests_to_content(to_device_requests), }; - let alice_session = alice.outbound_group_sessions.get(&room_id).unwrap(); + let alice_session = alice + .group_session_manager + .get_outbound_group_session(&room_id) + .unwrap(); let event = bob .decrypt_to_device_event(&event)