client: Refactor out the group session sharing logic.

master
Damir Jelić 2020-08-12 15:21:30 +02:00
parent 15d7deddb8
commit 41f04d4f5d
1 changed files with 57 additions and 42 deletions

View File

@ -946,6 +946,62 @@ impl Client {
self.send(request).await self.send(request).await
} }
/// Share a group session for the given room.
///
/// This will create Olm sessions with all the users/device pairs in the
/// room if necessary and share a group session with them.
///
/// Does nothing if no group session needs to be shared.
#[cfg(feature = "encryption")]
async fn preshare_group_session(&self, room_id: &RoomId) -> Result<()> {
// TODO expose this publicly so people can pre-share a group session if
// e.g. a user starts to type a message for a room.
if self.base_client.should_share_group_session(room_id).await {
#[allow(clippy::map_clone)]
if let Some(mutex) = self.group_session_locks.get(room_id).map(|m| m.clone()) {
// If a group session share request is already going on,
// await the release of the lock.
mutex.lock().await;
} else {
// Otherwise create a new lock and share the group
// session.
let mutex = Arc::new(Mutex::new(()));
self.group_session_locks
.insert(room_id.clone(), mutex.clone());
let _guard = mutex.lock().await;
let missing_sessions = {
let room = self.base_client.get_joined_room(room_id).await;
let room = room.as_ref().unwrap().read().await;
let members = room
.joined_members
.keys()
.chain(room.invited_members.keys());
self.base_client.get_missing_sessions(members).await?
};
if !missing_sessions.is_empty() {
self.claim_one_time_keys(missing_sessions).await?;
}
let response = self.share_group_session(room_id).await;
self.group_session_locks.remove(room_id);
// If one of the responses failed invalidate the group
// session as using it would end up in undecryptable
// messages.
if let Err(r) = response {
self.base_client.invalidate_group_session(room_id).await;
return Err(r);
}
}
}
Ok(())
}
/// Send a room message to the homeserver. /// Send a room message to the homeserver.
/// ///
/// Returns the parsed response from the server. /// Returns the parsed response from the server.
@ -1007,48 +1063,7 @@ impl Client {
}; };
if encrypted { if encrypted {
if self.base_client.should_share_group_session(room_id).await { self.preshare_group_session(room_id).await?;
#[allow(clippy::map_clone)]
if let Some(mutex) = self.group_session_locks.get(room_id).map(|m| m.clone()) {
// If a group session share request is already going on,
// await the release of the lock.
mutex.lock().await;
} else {
// Otherwise create a new lock and share the group
// session.
let mutex = Arc::new(Mutex::new(()));
self.group_session_locks
.insert(room_id.clone(), mutex.clone());
let _guard = mutex.lock().await;
let missing_sessions = {
let room = self.base_client.get_joined_room(room_id).await;
let room = room.as_ref().unwrap().read().await;
let members = room
.joined_members
.keys()
.chain(room.invited_members.keys());
self.base_client.get_missing_sessions(members).await?
};
if !missing_sessions.is_empty() {
self.claim_one_time_keys(missing_sessions).await?;
}
let response = self.share_group_session(room_id).await;
self.group_session_locks.remove(room_id);
// If one of the responses failed invalidate the group
// session as using it would end up in undecryptable
// messages.
if let Err(r) = response {
self.base_client.invalidate_group_session(room_id).await;
return Err(r);
}
}
}
raw_content = serde_json::value::to_raw_value( raw_content = serde_json::value::to_raw_value(
&self.base_client.encrypt(room_id, content).await?, &self.base_client.encrypt(room_id, content).await?,