crypto: Return a deserialized ToDevice struct when we receive a sync

master
Damir Jelić 2021-01-19 12:59:31 +01:00
parent 6a30514d40
commit 17f3dbb0a0
5 changed files with 53 additions and 32 deletions

View File

@ -27,6 +27,7 @@ use std::{
#[cfg(feature = "encryption")]
use dashmap::DashMap;
#[cfg(feature = "encryption")]
use futures::TryStreamExt;
use futures_timer::Delay as sleep;
use http::HeaderValue;
@ -121,7 +122,6 @@ use matrix_sdk_common::{
use crate::{
http_client::{client_with_config, HttpClient, HttpSend},
verification_request::VerificationRequest,
Error, OutgoingRequest, Result,
};
@ -130,6 +130,7 @@ use crate::{
device::{Device, UserDevices},
identifiers::DeviceId,
sas::Sas,
verification_request::VerificationRequest,
};
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
@ -1176,6 +1177,7 @@ impl Client {
Ok(())
}
#[cfg(feature = "encryption")]
pub(crate) async fn room_send_helper(
&self,
request: &RoomMessageRequest,
@ -1270,6 +1272,7 @@ impl Client {
.unwrap_or(false)
}
#[cfg(feature = "encryption")]
async fn are_members_synced(&self, room_id: &RoomId) -> bool {
self.base_client
.get_room(room_id)

View File

@ -24,10 +24,13 @@ use std::{
};
#[cfg(feature = "encryption")]
use futures::StreamExt;
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use matrix_sdk_common::{
api::r0 as api,
deserialized_responses::{
AccountData, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, MemberEvent,
Presence, Rooms, State, StrippedMemberEvent, SyncResponse, Timeline,
},
events::{
presence::PresenceEvent,
room::member::{MemberEventContent, MembershipState},
@ -41,10 +44,6 @@ use matrix_sdk_common::{
#[cfg(feature = "encryption")]
use matrix_sdk_common::{
api::r0::keys::claim_keys::Request as KeysClaimRequest,
deserialized_responses::{
AccountData, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, MemberEvent,
Presence, Rooms, State, StrippedMemberEvent, SyncResponse, Timeline,
},
events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, AnySyncMessageEvent},
identifiers::DeviceId,
locks::Mutex,
@ -675,7 +674,7 @@ impl BaseClient {
/// * `response` - The response that we received after a successful sync.
pub async fn receive_sync_response(
&self,
mut response: api::sync::sync_events::Response,
response: api::sync::sync_events::Response,
) -> Result<SyncResponse> {
// The server might respond multiple times with the same sync token, in
// that case we already received this response and there's nothing to
@ -687,7 +686,7 @@ impl BaseClient {
let now = SystemTime::now();
#[cfg(feature = "encryption")]
{
let to_device = {
let olm = self.olm.lock().await;
if let Some(o) = &*olm {
@ -695,9 +694,25 @@ impl BaseClient {
// decryptes to-device events, but leaves room events alone.
// This makes sure that we have the deryption keys for the room
// events at hand.
o.receive_sync_response(&mut response).await?;
}
o.receive_sync_response(&response).await?
} else {
response
.to_device
.events
.into_iter()
.filter_map(|e| e.deserialize().ok())
.collect::<Vec<AnyToDeviceEvent>>()
.into()
}
};
#[cfg(not(feature = "encryption"))]
let to_device = response
.to_device
.events
.into_iter()
.filter_map(|e| e.deserialize().ok())
.collect::<Vec<AnyToDeviceEvent>>()
.into();
let mut changes = StateChanges::new(response.next_batch.clone());
let mut rooms = Rooms::default();
@ -868,13 +883,7 @@ impl BaseClient {
account_data: AccountData {
events: changes.account_data.into_iter().map(|(_, e)| e).collect(),
},
to_device: response
.to_device
.events
.into_iter()
.filter_map(|e| e.deserialize().ok())
.collect::<Vec<AnyToDeviceEvent>>()
.into(),
to_device,
device_lists: response.device_lists,
device_one_time_keys_count: response
.device_one_time_keys_count

View File

@ -36,7 +36,6 @@ use matrix_sdk_common::{
},
identifiers::{DeviceId, DeviceIdBox, EventEncryptionAlgorithm, RoomId, UserId},
uuid::Uuid,
Raw,
};
use crate::{
@ -623,8 +622,7 @@ impl KeyRequestMachine {
&self,
sender_key: &str,
event: &mut ToDeviceEvent<ForwardedRoomKeyToDeviceEventContent>,
) -> Result<(Option<Raw<AnyToDeviceEvent>>, Option<InboundGroupSession>), CryptoStoreError>
{
) -> Result<(Option<AnyToDeviceEvent>, Option<InboundGroupSession>), CryptoStoreError> {
let key_info = self.get_key_info(&event.content).await?;
if let Some(info) = key_info {
@ -658,7 +656,7 @@ impl KeyRequestMachine {
};
Ok((
Some(Raw::from(AnyToDeviceEvent::ForwardedRoomKey(event.clone()))),
Some(AnyToDeviceEvent::ForwardedRoomKey(event.clone())),
session,
))
} else {

View File

@ -30,6 +30,7 @@ use matrix_sdk_common::{
sync::sync_events::Response as SyncResponse,
},
assign,
deserialized_responses::ToDevice,
events::{
room::encrypted::EncryptedEventContent, room_key::RoomKeyEventContent,
AnyMessageEventContent, AnySyncRoomEvent, AnyToDeviceEvent, SyncMessageEvent,
@ -573,7 +574,7 @@ impl OlmMachine {
// don't want them to be able to do silly things with it. Handling
// events modifies them and returns a modified one, so replace it
// here if we get one.
decrypted.event = event;
decrypted.deserialized_event = Some(event);
decrypted.inbound_group_session = group_session;
}
@ -586,7 +587,7 @@ impl OlmMachine {
sender_key: &str,
signing_key: &str,
event: &mut ToDeviceEvent<RoomKeyEventContent>,
) -> OlmResult<(Option<Raw<AnyToDeviceEvent>>, Option<InboundGroupSession>)> {
) -> OlmResult<(Option<AnyToDeviceEvent>, Option<InboundGroupSession>)> {
match event.content.algorithm {
EventEncryptionAlgorithm::MegolmV1AesSha2 => {
let session_key = GroupSessionKey(mem::take(&mut event.content.session_key));
@ -597,7 +598,7 @@ impl OlmMachine {
&event.content.room_id,
session_key,
)?;
let event = Raw::from(AnyToDeviceEvent::RoomKey(event.clone()));
let event = AnyToDeviceEvent::RoomKey(event.clone());
Ok((Some(event), Some(session)))
}
_ => {
@ -697,7 +698,7 @@ impl OlmMachine {
async fn handle_decrypted_to_device_event(
&self,
decrypted: &OlmDecryptionInfo,
) -> OlmResult<(Option<Raw<AnyToDeviceEvent>>, Option<InboundGroupSession>)> {
) -> OlmResult<(Option<AnyToDeviceEvent>, Option<InboundGroupSession>)> {
let event = match decrypted.event.deserialize() {
Ok(e) => e,
Err(e) => {
@ -719,7 +720,7 @@ impl OlmMachine {
.await?),
_ => {
warn!("Received an unexpected encrypted to-device event");
Ok((None, None))
Ok((Some(event), None))
}
}
}
@ -774,7 +775,7 @@ impl OlmMachine {
/// * `response` - The sync latest sync response.
///
/// [`decrypt_room_event`]: #method.decrypt_room_event
pub async fn receive_sync_response(&self, response: &mut SyncResponse) -> OlmResult<()> {
pub async fn receive_sync_response(&self, response: &SyncResponse) -> OlmResult<ToDevice> {
// Remove verification objects that have expired or are done.
self.verification_machine.garbage_collect();
@ -794,7 +795,9 @@ impl OlmMachine {
}
}
for event_result in &mut response.to_device.events {
let mut events = Vec::new();
for event_result in &response.to_device.events {
let mut event = if let Ok(e) = event_result.deserialize() {
e
} else {
@ -849,7 +852,9 @@ impl OlmMachine {
changes.inbound_group_sessions.push(group_session);
}
*event_result = decrypted.event;
if let Some(e) = decrypted.deserialized_event {
event = e;
}
}
AnyToDeviceEvent::RoomKeyRequest(e) => {
self.key_request_machine.receive_incoming_key_request(e)
@ -864,6 +869,8 @@ impl OlmMachine {
}
_ => continue,
}
events.push(event);
}
let changed_sessions = self
@ -873,7 +880,9 @@ impl OlmMachine {
changes.sessions.extend(changed_sessions);
Ok(self.store.save_changes(changes).await?)
self.store.save_changes(changes).await?;
Ok(ToDevice { events })
}
/// Decrypt an event from a room timeline.
@ -1661,7 +1670,7 @@ pub(crate) mod test {
.save_inbound_group_sessions(&[decrypted.inbound_group_session.unwrap()])
.await
.unwrap();
let event = decrypted.event.deserialize().unwrap();
let event = decrypted.deserialized_event.unwrap();
if let AnyToDeviceEvent::RoomKey(event) = event {
assert_eq!(&event.sender, alice.user_id());

View File

@ -91,6 +91,7 @@ impl SessionType {
pub struct OlmDecryptionInfo {
pub session: SessionType,
pub message_hash: OlmMessageHash,
pub deserialized_event: Option<AnyToDeviceEvent>,
pub event: Raw<AnyToDeviceEvent>,
pub signing_key: String,
pub sender_key: String,
@ -179,6 +180,7 @@ impl Account {
message_hash,
event,
signing_key,
deserialized_event: None,
sender_key: content.sender_key.clone(),
inbound_group_session: None,
})