crypto: Move the group session handling logic into separate module.
parent
646f18ae18
commit
ff2079da91
|
@ -0,0 +1,228 @@
|
||||||
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use matrix_sdk_common::{
|
||||||
|
api::r0::to_device::DeviceIdOrAllDevices,
|
||||||
|
events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, EventType},
|
||||||
|
identifiers::{RoomId, UserId},
|
||||||
|
uuid::Uuid,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
error::{EventError, MegolmResult, OlmResult},
|
||||||
|
key_request::Device,
|
||||||
|
olm::{Account, OutboundGroupSession},
|
||||||
|
store::{Result as StoreResult, Store},
|
||||||
|
EncryptionSettings, OlmError, ToDeviceRequest,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct GroupSessionManager {
|
||||||
|
account: Account,
|
||||||
|
/// Store for the encryption keys.
|
||||||
|
/// Persists all the encryption keys so a client can resume the session
|
||||||
|
/// without the need to create new keys.
|
||||||
|
store: Store,
|
||||||
|
/// The currently active outbound group sessions.
|
||||||
|
outbound_group_sessions: Arc<DashMap<RoomId, OutboundGroupSession>>,
|
||||||
|
outbound_sessions_being_shared: Arc<DashMap<Uuid, OutboundGroupSession>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GroupSessionManager {
|
||||||
|
const MAX_TO_DEVICE_MESSAGES: usize = 20;
|
||||||
|
|
||||||
|
pub(crate) fn new(account: Account, store: Store) -> Self {
|
||||||
|
Self {
|
||||||
|
account,
|
||||||
|
store,
|
||||||
|
outbound_group_sessions: Arc::new(DashMap::new()),
|
||||||
|
outbound_sessions_being_shared: Arc::new(DashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_user_devices(&self, user_id: &UserId) -> StoreResult<Vec<Device>> {
|
||||||
|
let devices = self.store.get_user_devices(user_id).await?;
|
||||||
|
|
||||||
|
let own_identity = self
|
||||||
|
.store
|
||||||
|
.get_user_identity(self.account.user_id())
|
||||||
|
.await?
|
||||||
|
.map(|i| i.own().cloned())
|
||||||
|
.flatten();
|
||||||
|
let device_owner_identity = self.store.get_user_identity(user_id).await.ok().flatten();
|
||||||
|
|
||||||
|
Ok(devices
|
||||||
|
.devices()
|
||||||
|
.map(|d| Device {
|
||||||
|
inner: d.clone(),
|
||||||
|
store: self.store.clone(),
|
||||||
|
own_identity: own_identity.clone(),
|
||||||
|
device_owner_identity: device_owner_identity.clone(),
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn invalidate_group_session(&self, room_id: &RoomId) -> bool {
|
||||||
|
self.outbound_group_sessions.remove(room_id).is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an outbound group session for a room, if one exists.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `room_id` - The id of the room for which we should get the outbound
|
||||||
|
/// group session.
|
||||||
|
pub fn get_outbound_group_session(&self, room_id: &RoomId) -> Option<OutboundGroupSession> {
|
||||||
|
#[allow(clippy::map_clone)]
|
||||||
|
self.outbound_group_sessions.get(room_id).map(|s| s.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn encrypt(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
content: AnyMessageEventContent,
|
||||||
|
) -> MegolmResult<EncryptedEventContent> {
|
||||||
|
let session = if let Some(s) = self.get_outbound_group_session(room_id) {
|
||||||
|
s
|
||||||
|
} else {
|
||||||
|
panic!("Session wasn't created nor shared");
|
||||||
|
};
|
||||||
|
|
||||||
|
if session.expired() {
|
||||||
|
panic!("Session expired");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(session.encrypt(content).await)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Should the client share a group session for the given room.
|
||||||
|
///
|
||||||
|
/// Returns true if a session needs to be shared before room messages can be
|
||||||
|
/// encrypted, false if one is already shared and ready to encrypt room
|
||||||
|
/// messages.
|
||||||
|
///
|
||||||
|
/// This should be called every time a new room message wants to be sent out
|
||||||
|
/// since group sessions can expire at any time.
|
||||||
|
pub fn should_share_group_session(&self, room_id: &RoomId) -> bool {
|
||||||
|
let session = self.outbound_group_sessions.get(room_id);
|
||||||
|
|
||||||
|
match session {
|
||||||
|
Some(s) => !s.shared() || s.expired(),
|
||||||
|
None => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new outbound group session.
|
||||||
|
///
|
||||||
|
/// This also creates a matching inbound group session and saves that one in
|
||||||
|
/// the store.
|
||||||
|
pub async fn create_outbound_group_session(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
settings: EncryptionSettings,
|
||||||
|
users_to_share_with: impl Iterator<Item = &UserId>,
|
||||||
|
) -> OlmResult<()> {
|
||||||
|
let (outbound, inbound) = self
|
||||||
|
.account
|
||||||
|
.create_group_session_pair(room_id, settings, users_to_share_with)
|
||||||
|
.await
|
||||||
|
.map_err(|_| EventError::UnsupportedAlgorithm)?;
|
||||||
|
|
||||||
|
let _ = self.store.save_inbound_group_sessions(&[inbound]).await?;
|
||||||
|
|
||||||
|
let _ = self
|
||||||
|
.outbound_group_sessions
|
||||||
|
.insert(room_id.to_owned(), outbound);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get to-device requests to share a group session with users in a room.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// `room_id` - The room id of the room where the group session will be
|
||||||
|
/// used.
|
||||||
|
///
|
||||||
|
/// `users` - The list of users that should receive the group session.
|
||||||
|
pub async fn share_group_session(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
users: impl Iterator<Item = &UserId>,
|
||||||
|
encryption_settings: impl Into<EncryptionSettings>,
|
||||||
|
) -> OlmResult<Vec<ToDeviceRequest>> {
|
||||||
|
self.create_outbound_group_session(room_id, encryption_settings.into(), users)
|
||||||
|
.await?;
|
||||||
|
let session = self.outbound_group_sessions.get(room_id).unwrap();
|
||||||
|
|
||||||
|
if session.shared() {
|
||||||
|
panic!("Session is already shared");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO don't mark the session as shared automatically, only when all
|
||||||
|
// the requests are done, failure to send these requests will likely end
|
||||||
|
// up in wedged sessions. We'll need to store the requests and let the
|
||||||
|
// caller mark them as sent using an UUID.
|
||||||
|
session.mark_as_shared();
|
||||||
|
|
||||||
|
let mut devices = Vec::new();
|
||||||
|
|
||||||
|
for user_id in session.users_to_share_with() {
|
||||||
|
let mut user_devices = self.get_user_devices(&user_id).await?;
|
||||||
|
devices.extend(user_devices.drain(..).filter(|d| !d.is_blacklisted()))
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut requests = Vec::new();
|
||||||
|
let key_content = session.as_json().await;
|
||||||
|
|
||||||
|
for device_map_chunk in devices.chunks(Self::MAX_TO_DEVICE_MESSAGES) {
|
||||||
|
let mut messages = BTreeMap::new();
|
||||||
|
|
||||||
|
for device in device_map_chunk {
|
||||||
|
let encrypted = device
|
||||||
|
.encrypt(EventType::RoomKey, key_content.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let encrypted = match encrypted {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(OlmError::MissingSession)
|
||||||
|
| Err(OlmError::EventError(EventError::MissingSenderKey)) => {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
messages
|
||||||
|
.entry(device.user_id().clone())
|
||||||
|
.or_insert_with(BTreeMap::new)
|
||||||
|
.insert(
|
||||||
|
DeviceIdOrAllDevices::DeviceId(device.device_id().into()),
|
||||||
|
serde_json::value::to_raw_value(&encrypted)?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
|
||||||
|
requests.push(ToDeviceRequest {
|
||||||
|
event_type: EventType::RoomEncrypted,
|
||||||
|
txn_id: id,
|
||||||
|
messages,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(requests)
|
||||||
|
}
|
||||||
|
}
|
|
@ -115,6 +115,7 @@ impl Device {
|
||||||
/// * `event_type` - The type of the event.
|
/// * `event_type` - The type of the event.
|
||||||
///
|
///
|
||||||
/// * `content` - The content of the event that should be encrypted.
|
/// * `content` - The content of the event that should be encrypted.
|
||||||
|
#[cfg(test)]
|
||||||
pub(crate) async fn encrypt(
|
pub(crate) async fn encrypt(
|
||||||
&self,
|
&self,
|
||||||
event_type: EventType,
|
event_type: EventType,
|
||||||
|
|
|
@ -50,11 +50,11 @@ use crate::{
|
||||||
store::{CryptoStoreError, Store},
|
store::{CryptoStoreError, Store},
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Device {
|
pub struct Device {
|
||||||
inner: ReadOnlyDevice,
|
pub(crate) inner: ReadOnlyDevice,
|
||||||
store: Store,
|
pub(crate) store: Store,
|
||||||
own_identity: Option<OwnUserIdentity>,
|
pub(crate) own_identity: Option<OwnUserIdentity>,
|
||||||
device_owner_identity: Option<UserIdentities>,
|
pub(crate) device_owner_identity: Option<UserIdentities>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Device {
|
impl Device {
|
||||||
|
@ -88,7 +88,7 @@ impl Device {
|
||||||
.trust_state(&self.own_identity, &self.device_owner_identity)
|
.trust_state(&self.own_identity, &self.device_owner_identity)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn encrypt(
|
pub async fn encrypt(
|
||||||
&self,
|
&self,
|
||||||
event_type: EventType,
|
event_type: EventType,
|
||||||
content: Value,
|
content: Value,
|
||||||
|
|
|
@ -29,6 +29,7 @@
|
||||||
|
|
||||||
mod error;
|
mod error;
|
||||||
mod file_encryption;
|
mod file_encryption;
|
||||||
|
mod group_manager;
|
||||||
mod identities;
|
mod identities;
|
||||||
mod key_request;
|
mod key_request;
|
||||||
mod machine;
|
mod machine;
|
||||||
|
|
|
@ -27,12 +27,11 @@ use matrix_sdk_common::{
|
||||||
upload_keys,
|
upload_keys,
|
||||||
},
|
},
|
||||||
sync::sync_events::Response as SyncResponse,
|
sync::sync_events::Response as SyncResponse,
|
||||||
to_device::DeviceIdOrAllDevices,
|
|
||||||
},
|
},
|
||||||
assign,
|
assign,
|
||||||
events::{
|
events::{
|
||||||
room::encrypted::EncryptedEventContent, room_key::RoomKeyEventContent,
|
room::encrypted::EncryptedEventContent, room_key::RoomKeyEventContent,
|
||||||
AnyMessageEventContent, AnySyncRoomEvent, AnyToDeviceEvent, EventType, SyncMessageEvent,
|
AnyMessageEventContent, AnySyncRoomEvent, AnyToDeviceEvent, SyncMessageEvent,
|
||||||
ToDeviceEvent,
|
ToDeviceEvent,
|
||||||
},
|
},
|
||||||
identifiers::{
|
identifiers::{
|
||||||
|
@ -44,18 +43,20 @@ use matrix_sdk_common::{
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "sqlite_cryptostore")]
|
#[cfg(feature = "sqlite_cryptostore")]
|
||||||
use super::store::sqlite::SqliteStore;
|
use crate::store::sqlite::SqliteStore;
|
||||||
use super::{
|
use crate::{
|
||||||
error::{EventError, MegolmError, MegolmResult, OlmError, OlmResult},
|
error::{EventError, MegolmError, MegolmResult, OlmResult},
|
||||||
|
group_manager::GroupSessionManager,
|
||||||
identities::{Device, IdentityManager, ReadOnlyDevice, UserDevices, UserIdentities},
|
identities::{Device, IdentityManager, ReadOnlyDevice, UserDevices, UserIdentities},
|
||||||
key_request::KeyRequestMachine,
|
key_request::KeyRequestMachine,
|
||||||
olm::{
|
olm::{
|
||||||
Account, EncryptionSettings, ExportedRoomKey, GroupSessionKey, IdentityKeys,
|
Account, EncryptionSettings, ExportedRoomKey, GroupSessionKey, IdentityKeys,
|
||||||
InboundGroupSession, OutboundGroupSession, ReadOnlyAccount,
|
InboundGroupSession, ReadOnlyAccount,
|
||||||
},
|
},
|
||||||
requests::{IncomingResponse, OutgoingRequest, ToDeviceRequest},
|
requests::{IncomingResponse, OutgoingRequest},
|
||||||
store::{CryptoStore, MemoryStore, Result as StoreResult, Store},
|
store::{CryptoStore, MemoryStore, Result as StoreResult, Store},
|
||||||
verification::{Sas, VerificationMachine},
|
verification::{Sas, VerificationMachine},
|
||||||
|
ToDeviceRequest,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// State machine implementation of the Olm/Megolm encryption protocol used for
|
/// State machine implementation of the Olm/Megolm encryption protocol used for
|
||||||
|
@ -72,8 +73,8 @@ pub struct OlmMachine {
|
||||||
/// Persists all the encryption keys so a client can resume the session
|
/// Persists all the encryption keys so a client can resume the session
|
||||||
/// without the need to create new keys.
|
/// without the need to create new keys.
|
||||||
store: Store,
|
store: Store,
|
||||||
/// The currently active outbound group sessions.
|
/// A state machine that keeps track of our outbound group sessions.
|
||||||
outbound_group_sessions: Arc<DashMap<RoomId, OutboundGroupSession>>,
|
group_session_manager: GroupSessionManager,
|
||||||
/// A state machine that is responsible to handle and keep track of SAS
|
/// A state machine that is responsible to handle and keep track of SAS
|
||||||
/// verification flows.
|
/// verification flows.
|
||||||
verification_machine: VerificationMachine,
|
verification_machine: VerificationMachine,
|
||||||
|
@ -96,7 +97,6 @@ impl std::fmt::Debug for OlmMachine {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OlmMachine {
|
impl OlmMachine {
|
||||||
const MAX_TO_DEVICE_MESSAGES: usize = 20;
|
|
||||||
const KEY_CLAIM_TIMEOUT: Duration = Duration::from_secs(10);
|
const KEY_CLAIM_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
/// Create a new memory based OlmMachine.
|
/// Create a new memory based OlmMachine.
|
||||||
|
@ -137,16 +137,19 @@ impl OlmMachine {
|
||||||
);
|
);
|
||||||
let identity_manager =
|
let identity_manager =
|
||||||
IdentityManager::new(user_id.clone(), device_id.clone(), store.clone());
|
IdentityManager::new(user_id.clone(), device_id.clone(), store.clone());
|
||||||
|
let account = Account {
|
||||||
|
inner: account,
|
||||||
|
store: store.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let group_session_manager = GroupSessionManager::new(account.clone(), store.clone());
|
||||||
|
|
||||||
OlmMachine {
|
OlmMachine {
|
||||||
user_id,
|
user_id,
|
||||||
device_id,
|
device_id,
|
||||||
account: Account {
|
account,
|
||||||
inner: account,
|
|
||||||
store: store.clone(),
|
|
||||||
},
|
|
||||||
store,
|
store,
|
||||||
outbound_group_sessions,
|
group_session_manager,
|
||||||
verification_machine,
|
verification_machine,
|
||||||
key_request_machine,
|
key_request_machine,
|
||||||
identity_manager,
|
identity_manager,
|
||||||
|
@ -564,50 +567,16 @@ impl OlmMachine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new outbound group session.
|
|
||||||
///
|
|
||||||
/// This also creates a matching inbound group session and saves that one in
|
|
||||||
/// the store.
|
|
||||||
async fn create_outbound_group_session(
|
|
||||||
&self,
|
|
||||||
room_id: &RoomId,
|
|
||||||
settings: EncryptionSettings,
|
|
||||||
users_to_share_with: impl Iterator<Item = &UserId>,
|
|
||||||
) -> OlmResult<()> {
|
|
||||||
let (outbound, inbound) = self
|
|
||||||
.account
|
|
||||||
.create_group_session_pair(room_id, settings, users_to_share_with)
|
|
||||||
.await
|
|
||||||
.map_err(|_| EventError::UnsupportedAlgorithm)?;
|
|
||||||
|
|
||||||
let _ = self.store.save_inbound_group_sessions(&[inbound]).await?;
|
|
||||||
|
|
||||||
let _ = self
|
|
||||||
.outbound_group_sessions
|
|
||||||
.insert(room_id.to_owned(), outbound);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) async fn create_outbound_group_session_with_defaults(
|
pub(crate) async fn create_outbound_group_session_with_defaults(
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
) -> OlmResult<()> {
|
) -> OlmResult<()> {
|
||||||
self.create_outbound_group_session(room_id, EncryptionSettings::default(), [].iter())
|
self.group_session_manager
|
||||||
|
.create_outbound_group_session(room_id, EncryptionSettings::default(), [].iter())
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an outbound group session for a room, if one exists.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `room_id` - The id of the room for which we should get the outbound
|
|
||||||
/// group session.
|
|
||||||
fn get_outbound_group_session(&self, room_id: &RoomId) -> Option<OutboundGroupSession> {
|
|
||||||
#[allow(clippy::map_clone)]
|
|
||||||
self.outbound_group_sessions.get(room_id).map(|s| s.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Encrypt a room message for the given room.
|
/// Encrypt a room message for the given room.
|
||||||
///
|
///
|
||||||
/// Beware that a group session needs to be shared before this method can be
|
/// Beware that a group session needs to be shared before this method can be
|
||||||
|
@ -637,17 +606,7 @@ impl OlmMachine {
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
content: AnyMessageEventContent,
|
content: AnyMessageEventContent,
|
||||||
) -> MegolmResult<EncryptedEventContent> {
|
) -> MegolmResult<EncryptedEventContent> {
|
||||||
let session = if let Some(s) = self.get_outbound_group_session(room_id) {
|
self.group_session_manager.encrypt(room_id, content).await
|
||||||
s
|
|
||||||
} else {
|
|
||||||
panic!("Session wasn't created nor shared");
|
|
||||||
};
|
|
||||||
|
|
||||||
if session.expired() {
|
|
||||||
panic!("Session expired");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(session.encrypt(content).await)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Should the client share a group session for the given room.
|
/// Should the client share a group session for the given room.
|
||||||
|
@ -659,12 +618,8 @@ impl OlmMachine {
|
||||||
/// This should be called every time a new room message wants to be sent out
|
/// This should be called every time a new room message wants to be sent out
|
||||||
/// since group sessions can expire at any time.
|
/// since group sessions can expire at any time.
|
||||||
pub fn should_share_group_session(&self, room_id: &RoomId) -> bool {
|
pub fn should_share_group_session(&self, room_id: &RoomId) -> bool {
|
||||||
let session = self.outbound_group_sessions.get(room_id);
|
self.group_session_manager
|
||||||
|
.should_share_group_session(room_id)
|
||||||
match session {
|
|
||||||
Some(s) => !s.shared() || s.expired(),
|
|
||||||
None => true,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Invalidate the currently active outbound group session for the given
|
/// Invalidate the currently active outbound group session for the given
|
||||||
|
@ -673,7 +628,7 @@ impl OlmMachine {
|
||||||
/// Returns true if a session was invalidated, false if there was no session
|
/// Returns true if a session was invalidated, false if there was no session
|
||||||
/// to invalidate.
|
/// to invalidate.
|
||||||
pub fn invalidate_group_session(&self, room_id: &RoomId) -> bool {
|
pub fn invalidate_group_session(&self, room_id: &RoomId) -> bool {
|
||||||
self.outbound_group_sessions.remove(room_id).is_some()
|
self.group_session_manager.invalidate_group_session(room_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get to-device requests to share a group session with users in a room.
|
/// Get to-device requests to share a group session with users in a room.
|
||||||
|
@ -690,67 +645,9 @@ impl OlmMachine {
|
||||||
users: impl Iterator<Item = &UserId>,
|
users: impl Iterator<Item = &UserId>,
|
||||||
encryption_settings: impl Into<EncryptionSettings>,
|
encryption_settings: impl Into<EncryptionSettings>,
|
||||||
) -> OlmResult<Vec<ToDeviceRequest>> {
|
) -> OlmResult<Vec<ToDeviceRequest>> {
|
||||||
self.create_outbound_group_session(room_id, encryption_settings.into(), users)
|
self.group_session_manager
|
||||||
.await?;
|
.share_group_session(room_id, users, encryption_settings)
|
||||||
let session = self.outbound_group_sessions.get(room_id).unwrap();
|
.await
|
||||||
|
|
||||||
if session.shared() {
|
|
||||||
panic!("Session is already shared");
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO don't mark the session as shared automatically, only when all
|
|
||||||
// the requests are done, failure to send these requests will likely end
|
|
||||||
// up in wedged sessions. We'll need to store the requests and let the
|
|
||||||
// caller mark them as sent using an UUID.
|
|
||||||
session.mark_as_shared();
|
|
||||||
|
|
||||||
let mut devices = Vec::new();
|
|
||||||
|
|
||||||
for user_id in session.users_to_share_with() {
|
|
||||||
for device in self.get_user_devices(&user_id).await?.devices() {
|
|
||||||
if !device.is_blacklisted() {
|
|
||||||
devices.push(device.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut requests = Vec::new();
|
|
||||||
let key_content = session.as_json().await;
|
|
||||||
|
|
||||||
for device_map_chunk in devices.chunks(OlmMachine::MAX_TO_DEVICE_MESSAGES) {
|
|
||||||
let mut messages = BTreeMap::new();
|
|
||||||
|
|
||||||
for device in device_map_chunk {
|
|
||||||
let encrypted = device
|
|
||||||
.encrypt(EventType::RoomKey, key_content.clone())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let encrypted = match encrypted {
|
|
||||||
Ok(c) => c,
|
|
||||||
Err(OlmError::MissingSession)
|
|
||||||
| Err(OlmError::EventError(EventError::MissingSenderKey)) => {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
};
|
|
||||||
|
|
||||||
messages
|
|
||||||
.entry(device.user_id().clone())
|
|
||||||
.or_insert_with(BTreeMap::new)
|
|
||||||
.insert(
|
|
||||||
DeviceIdOrAllDevices::DeviceId(device.device_id().into()),
|
|
||||||
serde_json::value::to_raw_value(&encrypted)?,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
requests.push(ToDeviceRequest {
|
|
||||||
event_type: EventType::RoomEncrypted,
|
|
||||||
txn_id: Uuid::new_v4(),
|
|
||||||
messages,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(requests)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive and properly handle a decrypted to-device event.
|
/// Receive and properly handle a decrypted to-device event.
|
||||||
|
@ -1427,11 +1324,17 @@ pub(crate) mod test {
|
||||||
.create_outbound_group_session_with_defaults(&room_id)
|
.create_outbound_group_session_with_defaults(&room_id)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(machine.outbound_group_sessions.get(&room_id).is_some());
|
assert!(machine
|
||||||
|
.group_session_manager
|
||||||
|
.get_outbound_group_session(&room_id)
|
||||||
|
.is_some());
|
||||||
|
|
||||||
machine.invalidate_group_session(&room_id);
|
machine.invalidate_group_session(&room_id);
|
||||||
|
|
||||||
assert!(machine.outbound_group_sessions.get(&room_id).is_none());
|
assert!(machine
|
||||||
|
.group_session_manager
|
||||||
|
.get_outbound_group_session(&room_id)
|
||||||
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -1645,7 +1548,10 @@ pub(crate) mod test {
|
||||||
content: to_device_requests_to_content(to_device_requests),
|
content: to_device_requests_to_content(to_device_requests),
|
||||||
};
|
};
|
||||||
|
|
||||||
let alice_session = alice.outbound_group_sessions.get(&room_id).unwrap();
|
let alice_session = alice
|
||||||
|
.group_session_manager
|
||||||
|
.get_outbound_group_session(&room_id)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let event = bob
|
let event = bob
|
||||||
.decrypt_to_device_event(&event)
|
.decrypt_to_device_event(&event)
|
||||||
|
|
Loading…
Reference in New Issue