crypto: Change the way we share group sessions
This patch removes the need to ask if a group session needs to be shared it also adapts the method so it re-shares sessions if new users or devices join the group.master
parent
40c53f09ba
commit
bab8fde0ac
|
@ -1139,7 +1139,6 @@ impl Client {
|
||||||
async fn preshare_group_session(&self, room_id: &RoomId) -> Result<()> {
|
async fn preshare_group_session(&self, room_id: &RoomId) -> Result<()> {
|
||||||
// TODO expose this publicly so people can pre-share a group session if
|
// TODO expose this publicly so people can pre-share a group session if
|
||||||
// e.g. a user starts to type a message for a room.
|
// e.g. a user starts to type a message for a room.
|
||||||
if self.base_client.should_share_group_session(room_id).await {
|
|
||||||
#[allow(clippy::map_clone)]
|
#[allow(clippy::map_clone)]
|
||||||
if let Some(mutex) = self.group_session_locks.get(room_id).map(|m| m.clone()) {
|
if let Some(mutex) = self.group_session_locks.get(room_id).map(|m| m.clone()) {
|
||||||
// If a group session share request is already going on,
|
// If a group session share request is already going on,
|
||||||
|
@ -1174,7 +1173,6 @@ impl Client {
|
||||||
return Err(r);
|
return Err(r);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1858,11 +1856,7 @@ impl Client {
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
|
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
|
||||||
#[instrument]
|
#[instrument]
|
||||||
async fn share_group_session(&self, room_id: &RoomId) -> Result<()> {
|
async fn share_group_session(&self, room_id: &RoomId) -> Result<()> {
|
||||||
let mut requests = self
|
let mut requests = self.base_client.share_group_session(room_id).await?;
|
||||||
.base_client
|
|
||||||
.share_group_session(room_id)
|
|
||||||
.await
|
|
||||||
.expect("Keys don't need to be uploaded");
|
|
||||||
|
|
||||||
for request in requests.drain(..) {
|
for request in requests.drain(..) {
|
||||||
let response = self.send_to_device(&request).await?;
|
let response = self.send_to_device(&request).await?;
|
||||||
|
|
|
@ -969,25 +969,6 @@ impl BaseClient {
|
||||||
self.store.get_filter(filter_name).await
|
self.store.get_filter(filter_name).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Should the client share a group session for the given room.
|
|
||||||
///
|
|
||||||
/// Returns true if a session needs to be shared before room messages can be
|
|
||||||
/// encrypted, false if one is already shared and ready to encrypt room
|
|
||||||
/// messages.
|
|
||||||
///
|
|
||||||
/// This should be called every time a new room message wants to be sent out
|
|
||||||
/// since group sessions can expire at any time.
|
|
||||||
#[cfg(feature = "encryption")]
|
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
|
|
||||||
pub async fn should_share_group_session(&self, room_id: &RoomId) -> bool {
|
|
||||||
let olm = self.olm.lock().await;
|
|
||||||
|
|
||||||
match &*olm {
|
|
||||||
Some(o) => o.should_share_group_session(room_id),
|
|
||||||
None => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the outgoing requests that need to be sent out.
|
/// Get the outgoing requests that need to be sent out.
|
||||||
///
|
///
|
||||||
/// This returns a list of `OutGoingRequest`, those requests need to be sent
|
/// This returns a list of `OutGoingRequest`, those requests need to be sent
|
||||||
|
|
|
@ -32,7 +32,6 @@ use crate::{
|
||||||
UserIdentity, UserSigningPubkey,
|
UserIdentity, UserSigningPubkey,
|
||||||
},
|
},
|
||||||
requests::KeysQueryRequest,
|
requests::KeysQueryRequest,
|
||||||
session_manager::GroupSessionManager,
|
|
||||||
store::{Changes, DeviceChanges, IdentityChanges, Result as StoreResult, Store},
|
store::{Changes, DeviceChanges, IdentityChanges, Result as StoreResult, Store},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -40,22 +39,15 @@ use crate::{
|
||||||
pub(crate) struct IdentityManager {
|
pub(crate) struct IdentityManager {
|
||||||
user_id: Arc<UserId>,
|
user_id: Arc<UserId>,
|
||||||
device_id: Arc<DeviceIdBox>,
|
device_id: Arc<DeviceIdBox>,
|
||||||
group_manager: GroupSessionManager,
|
|
||||||
store: Store,
|
store: Store,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IdentityManager {
|
impl IdentityManager {
|
||||||
pub fn new(
|
pub fn new(user_id: Arc<UserId>, device_id: Arc<DeviceIdBox>, store: Store) -> Self {
|
||||||
user_id: Arc<UserId>,
|
|
||||||
device_id: Arc<DeviceIdBox>,
|
|
||||||
store: Store,
|
|
||||||
group_manager: GroupSessionManager,
|
|
||||||
) -> Self {
|
|
||||||
IdentityManager {
|
IdentityManager {
|
||||||
user_id,
|
user_id,
|
||||||
device_id,
|
device_id,
|
||||||
store,
|
store,
|
||||||
group_manager,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,8 +110,6 @@ impl IdentityManager {
|
||||||
&self,
|
&self,
|
||||||
device_keys_map: &BTreeMap<UserId, BTreeMap<DeviceIdBox, DeviceKeys>>,
|
device_keys_map: &BTreeMap<UserId, BTreeMap<DeviceIdBox, DeviceKeys>>,
|
||||||
) -> StoreResult<DeviceChanges> {
|
) -> StoreResult<DeviceChanges> {
|
||||||
let mut users_with_new_or_deleted_devices = HashSet::new();
|
|
||||||
|
|
||||||
let mut changes = DeviceChanges::default();
|
let mut changes = DeviceChanges::default();
|
||||||
|
|
||||||
for (user_id, device_map) in device_keys_map {
|
for (user_id, device_map) in device_keys_map {
|
||||||
|
@ -165,7 +155,6 @@ impl IdentityManager {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
info!("Adding a new device to the device store {:?}", device);
|
info!("Adding a new device to the device store {:?}", device);
|
||||||
users_with_new_or_deleted_devices.insert(user_id);
|
|
||||||
changes.new.push(device);
|
changes.new.push(device);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -177,7 +166,6 @@ impl IdentityManager {
|
||||||
let deleted_devices_set = stored_devices_set.difference(¤t_devices);
|
let deleted_devices_set = stored_devices_set.difference(¤t_devices);
|
||||||
|
|
||||||
for device_id in deleted_devices_set {
|
for device_id in deleted_devices_set {
|
||||||
users_with_new_or_deleted_devices.insert(user_id);
|
|
||||||
if let Some(device) = stored_devices.get(*device_id) {
|
if let Some(device) = stored_devices.get(*device_id) {
|
||||||
device.mark_as_deleted();
|
device.mark_as_deleted();
|
||||||
changes.deleted.push(device.clone());
|
changes.deleted.push(device.clone());
|
||||||
|
@ -185,9 +173,6 @@ impl IdentityManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.group_manager
|
|
||||||
.invalidate_sessions_new_devices(&users_with_new_or_deleted_devices);
|
|
||||||
|
|
||||||
Ok(changes)
|
Ok(changes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,7 +362,7 @@ pub(crate) mod test {
|
||||||
|
|
||||||
use matrix_sdk_common::{
|
use matrix_sdk_common::{
|
||||||
api::r0::keys::get_keys::Response as KeyQueryResponse,
|
api::r0::keys::get_keys::Response as KeyQueryResponse,
|
||||||
identifiers::{room_id, user_id, DeviceIdBox, RoomId, UserId},
|
identifiers::{user_id, DeviceIdBox, UserId},
|
||||||
locks::Mutex,
|
locks::Mutex,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -388,8 +373,7 @@ pub(crate) mod test {
|
||||||
use crate::{
|
use crate::{
|
||||||
identities::IdentityManager,
|
identities::IdentityManager,
|
||||||
machine::test::response_from_file,
|
machine::test::response_from_file,
|
||||||
olm::{Account, PrivateCrossSigningIdentity, ReadOnlyAccount},
|
olm::{PrivateCrossSigningIdentity, ReadOnlyAccount},
|
||||||
session_manager::GroupSessionManager,
|
|
||||||
store::{CryptoStore, MemoryStore, Store},
|
store::{CryptoStore, MemoryStore, Store},
|
||||||
verification::VerificationMachine,
|
verification::VerificationMachine,
|
||||||
};
|
};
|
||||||
|
@ -406,10 +390,6 @@ pub(crate) mod test {
|
||||||
"WSKKLTJZCL".into()
|
"WSKKLTJZCL".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn room_id() -> RoomId {
|
|
||||||
room_id!("!test:localhost")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn manager() -> IdentityManager {
|
fn manager() -> IdentityManager {
|
||||||
let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(user_id())));
|
let identity = Arc::new(Mutex::new(PrivateCrossSigningIdentity::empty(user_id())));
|
||||||
let user_id = Arc::new(user_id());
|
let user_id = Arc::new(user_id());
|
||||||
|
@ -422,12 +402,7 @@ pub(crate) mod test {
|
||||||
Arc::new(Box::new(MemoryStore::new())),
|
Arc::new(Box::new(MemoryStore::new())),
|
||||||
verification,
|
verification,
|
||||||
);
|
);
|
||||||
let account = Account {
|
IdentityManager::new(user_id, Arc::new(device_id()), store)
|
||||||
inner: account,
|
|
||||||
store: store.clone(),
|
|
||||||
};
|
|
||||||
let group = GroupSessionManager::new(account, store.clone());
|
|
||||||
IdentityManager::new(user_id, Arc::new(device_id()), store, group)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn other_key_query() -> KeyQueryResponse {
|
pub(crate) fn other_key_query() -> KeyQueryResponse {
|
||||||
|
@ -657,78 +632,4 @@ pub(crate) mod test {
|
||||||
|
|
||||||
assert!(identity.is_device_signed(&device).is_ok())
|
assert!(identity.is_device_signed(&device).is_ok())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
|
||||||
async fn test_session_invalidation() {
|
|
||||||
let manager = manager();
|
|
||||||
let room_id = room_id();
|
|
||||||
let user_id = other_user_id();
|
|
||||||
let device_id: DeviceIdBox = "SKISMLNIMH".into();
|
|
||||||
|
|
||||||
manager
|
|
||||||
.group_manager
|
|
||||||
.create_outbound_group_session(&room_id, Default::default())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let session = manager
|
|
||||||
.group_manager
|
|
||||||
.get_outbound_group_session(&room_id)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
session.add_recipient(&user_id);
|
|
||||||
session.mark_as_shared();
|
|
||||||
|
|
||||||
assert!(!session.invalidated());
|
|
||||||
assert!(!session.expired());
|
|
||||||
|
|
||||||
// Receiving a new device invalidates the session.
|
|
||||||
manager
|
|
||||||
.receive_keys_query_response(&other_key_query())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert!(session.invalidated());
|
|
||||||
|
|
||||||
manager
|
|
||||||
.group_manager
|
|
||||||
.create_outbound_group_session(&room_id, Default::default())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let session = manager
|
|
||||||
.group_manager
|
|
||||||
.get_outbound_group_session(&room_id)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
session.add_recipient(&user_id);
|
|
||||||
session.mark_as_shared();
|
|
||||||
|
|
||||||
assert!(!session.invalidated());
|
|
||||||
assert!(!session.expired());
|
|
||||||
|
|
||||||
let device = manager
|
|
||||||
.store
|
|
||||||
.get_device(&user_id, &device_id)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert!(!device.deleted());
|
|
||||||
|
|
||||||
let response = KeyQueryResponse::try_from(response_from_file(&json!({
|
|
||||||
"device_keys": {
|
|
||||||
user_id: {}
|
|
||||||
},
|
|
||||||
"failures": {},
|
|
||||||
})))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Noticing that a device got deleted invalidates the session as well
|
|
||||||
manager
|
|
||||||
.receive_keys_query_response(&response)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert!(device.deleted());
|
|
||||||
assert!(session.invalidated());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -178,12 +178,8 @@ impl OlmMachine {
|
||||||
store.clone(),
|
store.clone(),
|
||||||
);
|
);
|
||||||
let group_session_manager = GroupSessionManager::new(account.clone(), store.clone());
|
let group_session_manager = GroupSessionManager::new(account.clone(), store.clone());
|
||||||
let identity_manager = IdentityManager::new(
|
let identity_manager =
|
||||||
user_id.clone(),
|
IdentityManager::new(user_id.clone(), device_id.clone(), store.clone());
|
||||||
device_id.clone(),
|
|
||||||
store.clone(),
|
|
||||||
group_session_manager.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
OlmMachine {
|
OlmMachine {
|
||||||
user_id,
|
user_id,
|
||||||
|
@ -662,19 +658,6 @@ impl OlmMachine {
|
||||||
self.group_session_manager.encrypt(room_id, content).await
|
self.group_session_manager.encrypt(room_id, content).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Should the client share a group session for the given room.
|
|
||||||
///
|
|
||||||
/// Returns true if a session needs to be shared before room messages can be
|
|
||||||
/// encrypted, false if one is already shared and ready to encrypt room
|
|
||||||
/// messages.
|
|
||||||
///
|
|
||||||
/// This should be called every time a new room message wants to be sent out
|
|
||||||
/// since group sessions can expire at any time.
|
|
||||||
pub fn should_share_group_session(&self, room_id: &RoomId) -> bool {
|
|
||||||
self.group_session_manager
|
|
||||||
.should_share_group_session(room_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Invalidate the currently active outbound group session for the given
|
/// Invalidate the currently active outbound group session for the given
|
||||||
/// room.
|
/// room.
|
||||||
///
|
///
|
||||||
|
@ -1447,7 +1430,8 @@ pub(crate) mod test {
|
||||||
assert!(machine
|
assert!(machine
|
||||||
.group_session_manager
|
.group_session_manager
|
||||||
.get_outbound_group_session(&room_id)
|
.get_outbound_group_session(&room_id)
|
||||||
.is_none());
|
.unwrap()
|
||||||
|
.invalidated());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
@ -60,7 +60,7 @@ const ROTATION_MESSAGES: u64 = 100;
|
||||||
/// Settings for an encrypted room.
|
/// Settings for an encrypted room.
|
||||||
///
|
///
|
||||||
/// This determines the algorithm and rotation periods of a group session.
|
/// This determines the algorithm and rotation periods of a group session.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct EncryptionSettings {
|
pub struct EncryptionSettings {
|
||||||
/// The encryption algorithm that should be used in the room.
|
/// The encryption algorithm that should be used in the room.
|
||||||
pub algorithm: EventEncryptionAlgorithm,
|
pub algorithm: EventEncryptionAlgorithm,
|
||||||
|
@ -113,7 +113,7 @@ pub struct OutboundGroupSession {
|
||||||
shared: Arc<AtomicBool>,
|
shared: Arc<AtomicBool>,
|
||||||
invalidated: Arc<AtomicBool>,
|
invalidated: Arc<AtomicBool>,
|
||||||
settings: Arc<EncryptionSettings>,
|
settings: Arc<EncryptionSettings>,
|
||||||
shared_with_set: Arc<DashMap<UserId, DashSet<DeviceIdBox>>>,
|
pub(crate) shared_with_set: Arc<DashMap<UserId, DashSet<DeviceIdBox>>>,
|
||||||
to_share_with_set: Arc<DashMap<Uuid, Arc<ToDeviceRequest>>>,
|
to_share_with_set: Arc<DashMap<Uuid, Arc<ToDeviceRequest>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,14 +162,9 @@ impl OutboundGroupSession {
|
||||||
self.to_share_with_set.insert(request_id, request);
|
self.to_share_with_set.insert(request_id, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_recipient(&self, user_id: &UserId) {
|
/// This should be called if an the user wishes to rotate this session.
|
||||||
self.shared_with_set
|
pub fn invalidate_session(&self) {
|
||||||
.entry(user_id.to_owned())
|
self.invalidated.store(true, Ordering::Relaxed)
|
||||||
.or_insert_with(DashSet::new);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn contains_recipient(&self, user_id: &UserId) -> bool {
|
|
||||||
self.shared_with_set.contains_key(user_id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark the request with the given request id as sent.
|
/// Mark the request with the given request id as sent.
|
||||||
|
@ -346,25 +341,6 @@ impl OutboundGroupSession {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark the session as invalid.
|
|
||||||
///
|
|
||||||
/// This should be called if an user/device deletes a device that received
|
|
||||||
/// this session.
|
|
||||||
pub fn invalidate_session(&self) {
|
|
||||||
self.invalidated.store(true, Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Clear out the requests returning the request ids.
|
|
||||||
pub fn clear_requests(&self) -> Vec<Uuid> {
|
|
||||||
let request_ids = self
|
|
||||||
.to_share_with_set
|
|
||||||
.iter()
|
|
||||||
.map(|item| *item.key())
|
|
||||||
.collect();
|
|
||||||
self.to_share_with_set.clear();
|
|
||||||
request_ids
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Has or will the session be shared with the given user/device pair.
|
/// Has or will the session be shared with the given user/device pair.
|
||||||
pub(crate) fn is_shared_with(&self, user_id: &UserId, device_id: &DeviceId) -> bool {
|
pub(crate) fn is_shared_with(&self, user_id: &UserId, device_id: &DeviceId) -> bool {
|
||||||
let shared_with = self
|
let shared_with = self
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, HashSet},
|
collections::{BTreeMap, HashMap, HashSet},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ use dashmap::DashMap;
|
||||||
use matrix_sdk_common::{
|
use matrix_sdk_common::{
|
||||||
api::r0::to_device::DeviceIdOrAllDevices,
|
api::r0::to_device::DeviceIdOrAllDevices,
|
||||||
events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, EventType},
|
events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, EventType},
|
||||||
identifiers::{RoomId, UserId},
|
identifiers::{DeviceId, DeviceIdBox, RoomId, UserId},
|
||||||
uuid::Uuid,
|
uuid::Uuid,
|
||||||
};
|
};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
@ -58,7 +58,12 @@ impl GroupSessionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn invalidate_group_session(&self, room_id: &RoomId) -> bool {
|
pub fn invalidate_group_session(&self, room_id: &RoomId) -> bool {
|
||||||
self.outbound_group_sessions.remove(room_id).is_some()
|
if let Some(s) = self.outbound_group_sessions.get(room_id) {
|
||||||
|
s.invalidate_session();
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn mark_request_as_sent(&self, request_id: &Uuid) {
|
pub fn mark_request_as_sent(&self, request_id: &Uuid) {
|
||||||
|
@ -67,25 +72,6 @@ impl GroupSessionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn invalidate_sessions_new_devices(&self, users: &HashSet<&UserId>) {
|
|
||||||
for session in self.outbound_group_sessions.iter() {
|
|
||||||
if users.iter().any(|u| session.contains_recipient(u)) {
|
|
||||||
info!(
|
|
||||||
"Invalidating outobund session {} for room {}",
|
|
||||||
session.session_id(),
|
|
||||||
session.room_id()
|
|
||||||
);
|
|
||||||
session.invalidate_session();
|
|
||||||
|
|
||||||
if !session.shared() {
|
|
||||||
for request_id in session.clear_requests() {
|
|
||||||
self.outbound_sessions_being_shared.remove(&request_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get an outbound group session for a room, if one exists.
|
/// Get an outbound group session for a room, if one exists.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
|
@ -115,23 +101,6 @@ impl GroupSessionManager {
|
||||||
Ok(session.encrypt(content).await)
|
Ok(session.encrypt(content).await)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Should the client share a group session for the given room.
|
|
||||||
///
|
|
||||||
/// Returns true if a session needs to be shared before room messages can be
|
|
||||||
/// encrypted, false if one is already shared and ready to encrypt room
|
|
||||||
/// messages.
|
|
||||||
///
|
|
||||||
/// This should be called every time a new room message wants to be sent out
|
|
||||||
/// since group sessions can expire at any time.
|
|
||||||
pub fn should_share_group_session(&self, room_id: &RoomId) -> bool {
|
|
||||||
let session = self.outbound_group_sessions.get(room_id);
|
|
||||||
|
|
||||||
match session {
|
|
||||||
Some(s) => !s.shared() || s.expired() || s.invalidated(),
|
|
||||||
None => true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a new outbound group session.
|
/// Create a new outbound group session.
|
||||||
///
|
///
|
||||||
/// This also creates a matching inbound group session and saves that one in
|
/// This also creates a matching inbound group session and saves that one in
|
||||||
|
@ -153,6 +122,26 @@ impl GroupSessionManager {
|
||||||
Ok((outbound, inbound))
|
Ok((outbound, inbound))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_or_create_outbound_session(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
settings: EncryptionSettings,
|
||||||
|
) -> OlmResult<(OutboundGroupSession, Option<InboundGroupSession>)> {
|
||||||
|
if let Some(s) = self.outbound_group_sessions.get(room_id).map(|s| s.clone()) {
|
||||||
|
if s.expired() || s.invalidated() {
|
||||||
|
self.create_outbound_group_session(room_id, settings)
|
||||||
|
.await
|
||||||
|
.map(|(o, i)| (o, i.into()))
|
||||||
|
} else {
|
||||||
|
Ok((s, None))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.create_outbound_group_session(room_id, settings)
|
||||||
|
.await
|
||||||
|
.map(|(o, i)| (o, i.into()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Get to-device requests to share a group session with users in a room.
|
/// Get to-device requests to share a group session with users in a room.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
|
@ -167,23 +156,96 @@ impl GroupSessionManager {
|
||||||
users: impl Iterator<Item = &UserId>,
|
users: impl Iterator<Item = &UserId>,
|
||||||
encryption_settings: impl Into<EncryptionSettings>,
|
encryption_settings: impl Into<EncryptionSettings>,
|
||||||
) -> OlmResult<Vec<Arc<ToDeviceRequest>>> {
|
) -> OlmResult<Vec<Arc<ToDeviceRequest>>> {
|
||||||
|
let users: HashSet<&UserId> = users.collect();
|
||||||
|
let encryption_settings = encryption_settings.into();
|
||||||
let mut changes = Changes::default();
|
let mut changes = Changes::default();
|
||||||
|
|
||||||
let (session, inbound_session) = self
|
let (outbound, inbound) = self
|
||||||
.create_outbound_group_session(room_id, encryption_settings.into())
|
.get_or_create_outbound_session(room_id, encryption_settings.clone())
|
||||||
.await?;
|
.await?;
|
||||||
changes.inbound_group_sessions.push(inbound_session);
|
|
||||||
|
|
||||||
let mut devices: Vec<Device> = Vec::new();
|
if let Some(inbound) = inbound {
|
||||||
|
changes.inbound_group_sessions.push(inbound);
|
||||||
for user_id in users {
|
|
||||||
session.add_recipient(user_id);
|
|
||||||
let user_devices = self.store.get_user_devices(&user_id).await?;
|
|
||||||
devices.extend(user_devices.devices().filter(|d| !d.is_blacklisted()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut devices: HashMap<UserId, Vec<Device>> = HashMap::new();
|
||||||
|
|
||||||
|
let users_shared_with: HashSet<UserId> = outbound
|
||||||
|
.shared_with_set
|
||||||
|
.iter()
|
||||||
|
.map(|k| k.key().clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let users_shared_with: HashSet<&UserId> = users_shared_with.iter().collect();
|
||||||
|
|
||||||
|
let user_left = !users_shared_with
|
||||||
|
.difference(&users)
|
||||||
|
.collect::<HashSet<&&UserId>>()
|
||||||
|
.is_empty();
|
||||||
|
|
||||||
|
let mut device_got_deleted = false;
|
||||||
|
|
||||||
|
for user_id in users {
|
||||||
|
let user_devices = self.store.get_user_devices(&user_id).await?;
|
||||||
|
|
||||||
|
if !device_got_deleted {
|
||||||
|
let device_ids: HashSet<DeviceIdBox> =
|
||||||
|
user_devices.keys().map(|d| d.clone()).collect();
|
||||||
|
|
||||||
|
device_got_deleted = if let Some(shared) = outbound.shared_with_set.get(user_id) {
|
||||||
|
let shared: HashSet<DeviceIdBox> = shared.iter().map(|d| d.clone()).collect();
|
||||||
|
!shared
|
||||||
|
.difference(&device_ids)
|
||||||
|
.collect::<HashSet<_>>()
|
||||||
|
.is_empty()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
devices
|
||||||
|
.entry(user_id.clone())
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.extend(user_devices.devices().filter(|d| !d.is_blacklisted()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let outbound = if user_left || device_got_deleted {
|
||||||
|
let (outbound, inbound) = self
|
||||||
|
.create_outbound_group_session(room_id, encryption_settings)
|
||||||
|
.await?;
|
||||||
|
changes.inbound_group_sessions.push(inbound);
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"A user/device has left the group {} since we last sent a message, \
|
||||||
|
rotating the outbound session.",
|
||||||
|
room_id
|
||||||
|
);
|
||||||
|
|
||||||
|
outbound
|
||||||
|
} else {
|
||||||
|
outbound
|
||||||
|
};
|
||||||
|
|
||||||
|
let devices: Vec<Device> = devices
|
||||||
|
.into_iter()
|
||||||
|
.map(|(_, d)| {
|
||||||
|
d.into_iter()
|
||||||
|
.filter(|d| !outbound.is_shared_with(d.user_id(), d.device_id()))
|
||||||
|
})
|
||||||
|
.flatten()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Sharing outbound session at index {} with {:?}",
|
||||||
|
outbound.message_index().await,
|
||||||
|
devices
|
||||||
|
.iter()
|
||||||
|
.map(|d| (d.user_id(), d.device_id()))
|
||||||
|
.collect::<Vec<(&UserId, &DeviceId)>>()
|
||||||
|
);
|
||||||
|
|
||||||
let mut requests = Vec::new();
|
let mut requests = Vec::new();
|
||||||
let key_content = session.as_json().await;
|
let key_content = outbound.as_json().await;
|
||||||
|
|
||||||
for device_map_chunk in devices.chunks(Self::MAX_TO_DEVICE_MESSAGES) {
|
for device_map_chunk in devices.chunks(Self::MAX_TO_DEVICE_MESSAGES) {
|
||||||
let mut messages = BTreeMap::new();
|
let mut messages = BTreeMap::new();
|
||||||
|
@ -221,19 +283,19 @@ impl GroupSessionManager {
|
||||||
messages,
|
messages,
|
||||||
});
|
});
|
||||||
|
|
||||||
session.add_request(id, request.clone());
|
outbound.add_request(id, request.clone());
|
||||||
self.outbound_sessions_being_shared
|
self.outbound_sessions_being_shared
|
||||||
.insert(id, session.clone());
|
.insert(id, outbound.clone());
|
||||||
requests.push(request);
|
requests.push(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
if requests.is_empty() {
|
if requests.is_empty() {
|
||||||
debug!(
|
debug!(
|
||||||
"Session {} for room {} doesn't need to be shared with anyone, marking as shared",
|
"Session {} for room {} doesn't need to be shared with anyone, marking as shared",
|
||||||
session.session_id(),
|
outbound.session_id(),
|
||||||
session.room_id()
|
outbound.room_id()
|
||||||
);
|
);
|
||||||
session.mark_as_shared();
|
outbound.mark_as_shared();
|
||||||
}
|
}
|
||||||
|
|
||||||
self.store.save_changes(changes).await?;
|
self.store.save_changes(changes).await?;
|
||||||
|
|
Loading…
Reference in New Issue