crypto: Refactor and document the share group session method a bit better
parent
4eb504d000
commit
e5ba0298d0
|
@ -21,14 +21,15 @@ use dashmap::DashMap;
|
|||
use matrix_sdk_common::{
|
||||
api::r0::to_device::DeviceIdOrAllDevices,
|
||||
events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, EventType},
|
||||
identifiers::{DeviceIdBox, RoomId, UserId},
|
||||
identifiers::{DeviceId, DeviceIdBox, RoomId, UserId},
|
||||
uuid::Uuid,
|
||||
};
|
||||
use serde_json::Value;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::{
|
||||
error::{EventError, MegolmResult, OlmResult},
|
||||
olm::{Account, InboundGroupSession, OutboundGroupSession},
|
||||
olm::{Account, InboundGroupSession, OutboundGroupSession, Session},
|
||||
store::{Changes, Store},
|
||||
Device, EncryptionSettings, OlmError, ToDeviceRequest,
|
||||
};
|
||||
|
@ -42,6 +43,8 @@ pub struct GroupSessionManager {
|
|||
store: Store,
|
||||
/// The currently active outbound group sessions.
|
||||
outbound_group_sessions: Arc<DashMap<RoomId, OutboundGroupSession>>,
|
||||
/// A map from the request id to the group session that the request belongs
|
||||
/// to. Used to mark requests belonging to the session as shared.
|
||||
outbound_sessions_being_shared: Arc<DashMap<Uuid, OutboundGroupSession>>,
|
||||
}
|
||||
|
||||
|
@ -143,32 +146,61 @@ impl GroupSessionManager {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get to-device requests to share a group session with users in a room.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// `room_id` - The room id of the room where the group session will be
|
||||
/// used.
|
||||
///
|
||||
/// `users` - The list of users that should receive the group session.
|
||||
pub async fn share_group_session(
|
||||
/// Encrypt the given content for the given devices and create a to-device
|
||||
/// requests that sends the encrypted content to them.
|
||||
async fn encrypt_session_for(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
users: impl Iterator<Item = &UserId>,
|
||||
encryption_settings: impl Into<EncryptionSettings>,
|
||||
) -> OlmResult<Vec<Arc<ToDeviceRequest>>> {
|
||||
let users: HashSet<&UserId> = users.collect();
|
||||
let encryption_settings = encryption_settings.into();
|
||||
let mut changes = Changes::default();
|
||||
content: Value,
|
||||
devices: &[Device],
|
||||
) -> OlmResult<(Uuid, ToDeviceRequest, Vec<Session>)> {
|
||||
let mut messages = BTreeMap::new();
|
||||
let mut changed_sessions = Vec::new();
|
||||
|
||||
let (outbound, inbound) = self
|
||||
.get_or_create_outbound_session(room_id, encryption_settings.clone())
|
||||
.await?;
|
||||
for device in devices {
|
||||
let encrypted = device.encrypt(EventType::RoomKey, content.clone()).await;
|
||||
|
||||
if let Some(inbound) = inbound {
|
||||
changes.inbound_group_sessions.push(inbound);
|
||||
let (used_session, encrypted) = match encrypted {
|
||||
Ok(c) => c,
|
||||
Err(OlmError::MissingSession)
|
||||
| Err(OlmError::EventError(EventError::MissingSenderKey)) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
changed_sessions.push(used_session);
|
||||
|
||||
messages
|
||||
.entry(device.user_id().clone())
|
||||
.or_insert_with(BTreeMap::new)
|
||||
.insert(
|
||||
DeviceIdOrAllDevices::DeviceId(device.device_id().into()),
|
||||
serde_json::value::to_raw_value(&encrypted)?,
|
||||
);
|
||||
}
|
||||
|
||||
let id = Uuid::new_v4();
|
||||
|
||||
let request = ToDeviceRequest {
|
||||
event_type: EventType::RoomEncrypted,
|
||||
txn_id: id,
|
||||
messages,
|
||||
};
|
||||
|
||||
Ok((id, request, changed_sessions))
|
||||
}
|
||||
|
||||
/// Given a list of user and an outbound session get the list of users and
|
||||
/// devices that this session should be shared with.
|
||||
///
|
||||
/// Returns a boolean indicating that the session needs to be rotated and
|
||||
/// the list of users/devices that should receive the session.
|
||||
pub async fn collect_session_recipients(
|
||||
&self,
|
||||
users: impl Iterator<Item = &UserId>,
|
||||
outbound: &OutboundGroupSession,
|
||||
) -> OlmResult<(bool, HashMap<UserId, Vec<Device>>)> {
|
||||
let users: HashSet<&UserId> = users.collect();
|
||||
let mut devices: HashMap<UserId, Vec<Device>> = HashMap::new();
|
||||
|
||||
let users_shared_with: HashSet<UserId> = outbound
|
||||
|
@ -179,9 +211,11 @@ impl GroupSessionManager {
|
|||
|
||||
let users_shared_with: HashSet<&UserId> = users_shared_with.iter().collect();
|
||||
|
||||
// A user left if a user is missing from the set of users that should
|
||||
// get the session but is in the set of users that received the sessoin.
|
||||
let user_left = !users_shared_with
|
||||
.difference(&users)
|
||||
.collect::<HashSet<&&UserId>>()
|
||||
.collect::<HashSet<_>>()
|
||||
.is_empty();
|
||||
|
||||
let mut device_got_deleted = false;
|
||||
|
@ -189,12 +223,16 @@ impl GroupSessionManager {
|
|||
for user_id in users {
|
||||
let user_devices = self.store.get_user_devices(&user_id).await?;
|
||||
|
||||
if !device_got_deleted {
|
||||
let device_ids: HashSet<DeviceIdBox> = user_devices.keys().cloned().collect();
|
||||
// If no device got deleted until now and no user left check if one
|
||||
// got deleted for this user.
|
||||
if !device_got_deleted && !user_left {
|
||||
let device_ids: HashSet<&DeviceId> =
|
||||
user_devices.keys().map(|d| d.as_ref()).collect();
|
||||
|
||||
device_got_deleted = if let Some(shared) = outbound.shared_with_set.get(user_id) {
|
||||
#[allow(clippy::map_clone)]
|
||||
let shared: HashSet<DeviceIdBox> = shared.iter().map(|d| d.clone()).collect();
|
||||
let shared: HashSet<&DeviceId> = shared.iter().map(|d| d.as_ref()).collect();
|
||||
!shared
|
||||
.difference(&device_ids)
|
||||
.collect::<HashSet<_>>()
|
||||
|
@ -210,7 +248,42 @@ impl GroupSessionManager {
|
|||
.extend(user_devices.devices().filter(|d| !d.is_blacklisted()));
|
||||
}
|
||||
|
||||
let outbound = if user_left || device_got_deleted {
|
||||
// To protect the room history we need to rotate the session if a user
|
||||
// left or if a device got deleted, put differently if someone leaves
|
||||
// the encrypted group.
|
||||
let should_rotate = user_left || device_got_deleted;
|
||||
|
||||
Ok((should_rotate, devices))
|
||||
}
|
||||
|
||||
/// Get to-device requests to share a group session with users in a room.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// `room_id` - The room id of the room where the group session will be
|
||||
/// used.
|
||||
///
|
||||
/// `users` - The list of users that should receive the group session.
|
||||
pub async fn share_group_session(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
users: impl Iterator<Item = &UserId>,
|
||||
encryption_settings: impl Into<EncryptionSettings>,
|
||||
) -> OlmResult<Vec<Arc<ToDeviceRequest>>> {
|
||||
let encryption_settings = encryption_settings.into();
|
||||
let mut changes = Changes::default();
|
||||
|
||||
let (outbound, inbound) = self
|
||||
.get_or_create_outbound_session(room_id, encryption_settings.clone())
|
||||
.await?;
|
||||
|
||||
if let Some(inbound) = inbound {
|
||||
changes.inbound_group_sessions.push(inbound);
|
||||
}
|
||||
|
||||
let (should_rotate, devices) = self.collect_session_recipients(users, &outbound).await?;
|
||||
|
||||
let outbound = if should_rotate {
|
||||
let (outbound, inbound) = self
|
||||
.create_outbound_group_session(room_id, encryption_settings)
|
||||
.await?;
|
||||
|
@ -252,42 +325,12 @@ impl GroupSessionManager {
|
|||
let key_content = outbound.as_json().await;
|
||||
|
||||
for device_map_chunk in devices.chunks(Self::MAX_TO_DEVICE_MESSAGES) {
|
||||
let mut messages = BTreeMap::new();
|
||||
let (id, request, used_sessions) = self
|
||||
.encrypt_session_for(key_content.clone(), device_map_chunk)
|
||||
.await?;
|
||||
|
||||
for device in device_map_chunk {
|
||||
let encrypted = device
|
||||
.encrypt(EventType::RoomKey, key_content.clone())
|
||||
.await;
|
||||
|
||||
let (used_session, encrypted) = match encrypted {
|
||||
Ok(c) => c,
|
||||
Err(OlmError::MissingSession)
|
||||
| Err(OlmError::EventError(EventError::MissingSenderKey)) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
changes.sessions.push(used_session);
|
||||
|
||||
messages
|
||||
.entry(device.user_id().clone())
|
||||
.or_insert_with(BTreeMap::new)
|
||||
.insert(
|
||||
DeviceIdOrAllDevices::DeviceId(device.device_id().into()),
|
||||
serde_json::value::to_raw_value(&encrypted)?,
|
||||
);
|
||||
}
|
||||
|
||||
let id = Uuid::new_v4();
|
||||
|
||||
let request = Arc::new(ToDeviceRequest {
|
||||
event_type: EventType::RoomEncrypted,
|
||||
txn_id: id,
|
||||
messages,
|
||||
});
|
||||
|
||||
outbound.add_request(id, request);
|
||||
outbound.add_request(id, request.into());
|
||||
changes.sessions.extend(used_sessions);
|
||||
self.outbound_sessions_being_shared
|
||||
.insert(id, outbound.clone());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue