diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 2128e92a..1de3d00e 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -40,7 +40,7 @@ use zeroize::Zeroizing; use tracing::{debug, warn}; use tracing::{error, info, instrument}; -use matrix_sdk_base::{BaseClient, BaseClientConfig, Room, Session}; +use matrix_sdk_base::{responses::SyncResponse, BaseClient, BaseClientConfig, Room, Session}; #[cfg(feature = "encryption")] use matrix_sdk_base::crypto::{ @@ -1119,7 +1119,7 @@ impl Client { async fn are_members_synced(&self, room_id: &RoomId) -> bool { match self.base_client.get_joined_room(room_id) { - Some(r) => r.is_encrypted(), + Some(r) => r.are_members_synced(), None => true, } } @@ -1468,10 +1468,7 @@ impl Client { /// /// [`sync`]: #method.sync #[instrument] - pub async fn sync_once( - &self, - sync_settings: SyncSettings<'_>, - ) -> Result { + pub async fn sync_once(&self, sync_settings: SyncSettings<'_>) -> Result { let request = assign!(sync_events::Request::new(), { filter: sync_settings.filter.as_ref(), since: sync_settings.token.as_deref(), @@ -1482,11 +1479,10 @@ impl Client { let mut response = self.send(request).await?; - self.base_client + Ok(self + .base_client .receive_sync_response(&mut response) - .await?; - - Ok(response) + .await?) } /// Repeatedly call sync to synchronize the client state with the server. @@ -1568,7 +1564,7 @@ impl Client { pub async fn sync_with_callback( &self, sync_settings: SyncSettings<'_>, - callback: impl Fn(sync_events::Response) -> C, + callback: impl Fn(SyncResponse) -> C, ) where C: Future, { diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index f064d4f1..b0bb0a4f 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -39,7 +39,7 @@ use matrix_sdk_common::{ #[cfg(feature = "encryption")] use matrix_sdk_common::{ api::r0::keys::claim_keys::Request as KeysClaimRequest, - events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent}, + events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, AnySyncMessageEvent}, identifiers::DeviceId, uuid::Uuid, }; @@ -49,11 +49,12 @@ use matrix_sdk_crypto::{ Device, EncryptionSettings, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas, ToDeviceRequest, UserDevices, }; -use tracing::info; +use tracing::{info, warn}; use zeroize::Zeroizing; use crate::{ error::Result, + responses::{JoinedRoom, Rooms, State, SyncResponse, Timeline}, session::Session, store::{Room, RoomType, StateChanges, Store}, }; @@ -125,7 +126,9 @@ fn hoist_room_event_prev_content( ) -> StdResult { let prev_content = serde_json::from_str::(event.json().get()) .map(|more_unsigned| more_unsigned.unsigned) - .map(|additional| additional.prev_content)?; + .map(|additional| additional.prev_content)? + .map(|p| p.deserialize().ok()) + .flatten(); let mut ev = event.deserialize()?; @@ -133,7 +136,7 @@ fn hoist_room_event_prev_content( AnySyncRoomEvent::State(AnySyncStateEvent::RoomMember(ref mut member)) if member.prev_content.is_none() => { - member.prev_content = prev_content.map(|p| p.deserialize().ok()).flatten(); + member.prev_content = prev_content; } _ => (), } @@ -152,19 +155,19 @@ fn stripped_deserialize_prev_content( fn handle_membership( changes: &mut StateChanges, room_id: &RoomId, - event: SyncStateEvent, + event: &SyncStateEvent, ) { use matrix_sdk_common::events::room::member::MembershipState::*; match &event.content.membership { Join => { info!("ADDING MEMBER {} to {}", event.state_key, room_id); - changes.add_joined_member(room_id, event) + changes.add_joined_member(room_id, event.clone()) // TODO check if the display name is // ambigous } Invite => { info!("ADDING INVITED MEMBER {} to {}", event.state_key, room_id); - changes.add_invited_member(room_id, event) + changes.add_invited_member(room_id, event.clone()) } _ => info!("UNHANDLED MEMBERSHIP"), } @@ -461,12 +464,12 @@ impl BaseClient { pub async fn receive_sync_response( &self, response: &mut api::sync::sync_events::Response, - ) -> Result<()> { + ) -> Result { // The server might respond multiple times with the same sync token, in // that case we already received this response and there's nothing to // do. if self.sync_token.read().await.as_ref() == Some(&response.next_batch) { - return Ok(()); + return Ok(SyncResponse::new_empty(response.next_batch.clone())); } #[cfg(feature = "encryption")] @@ -487,6 +490,8 @@ impl BaseClient { // apply and emit the new events and rooms. let mut changes = StateChanges::default(); + let mut rooms = Rooms::default(); + for (room_id, room_info) in &response.rooms.join { let room = self.get_or_create_room(room_id, RoomType::Joined).await; @@ -494,37 +499,60 @@ impl BaseClient { summary.update(&room_info.summary); summary.set_prev_batch(room_info.timeline.prev_batch.as_deref()); + let mut state = State::default(); + for e in &room_info.state.events { if let Ok(event) = hoist_and_deserialize_state_event(e) { - match event { + match &event { AnySyncStateEvent::RoomMember(member) => { handle_membership(&mut changes, room_id, member); } e => { summary.handle_state_event(&e); - changes.add_state_event(room_id, e); + changes.add_state_event(room_id, e.clone()); } } + + state.events.push(event); } } + let mut timeline = Timeline::new( + room_info.timeline.limited, + room_info.timeline.prev_batch.clone(), + ); + for event in &room_info.timeline.events { - if let Ok(e) = hoist_room_event_prev_content(event) { - match e { + if let Ok(mut e) = hoist_room_event_prev_content(event) { + match &mut e { AnySyncRoomEvent::State(s) => match s { AnySyncStateEvent::RoomMember(member) => { handle_membership(&mut changes, room_id, member); } _ => { summary.handle_state_event(&s); - changes.add_state_event(room_id, s); + changes.add_state_event(room_id, s.clone()); } }, - AnySyncRoomEvent::Message(_) => { - // TODO decrypt the event if it's an encrypted one. + AnySyncRoomEvent::Message(message) => + { + #[cfg(feature = "encryption")] + if let AnySyncMessageEvent::RoomEncrypted(encrypted) = message { + if let Some(olm) = self.olm_machine().await { + if let Ok(decrypted) = + olm.decrypt_room_event(encrypted, room_id).await + { + if let Ok(decrypted) = decrypted.deserialize() { + e = decrypted; + } + } + } + } } _ => (), } + + timeline.events.push(e); } } @@ -541,6 +569,14 @@ impl BaseClient { } } + rooms + .join + .insert(room_id.to_owned(), JoinedRoom::new(timeline, state)); + + if room_info.timeline.limited { + summary.mark_members_missing(); + } + changes.add_room(summary); } @@ -555,7 +591,7 @@ impl BaseClient { } } - Ok(()) + Ok(SyncResponse::new(response.next_batch.clone(), rooms)) } pub async fn receive_members( @@ -563,9 +599,14 @@ impl BaseClient { room_id: &RoomId, response: &api::membership::get_member_events::Response, ) -> Result<()> { - if self.get_joined_room(room_id).is_some() { + if let Some(room) = self.get_joined_room(room_id) { + let mut summary = room.clone_summary(); + summary.mark_members_synced(); + let mut changes = StateChanges::default(); + changes.add_room(summary); + // TODO make sure we don't overwrite memership events from a sync. for e in &response.chunk { if let Ok(event) = hoist_member_event(e) { @@ -576,7 +617,7 @@ impl BaseClient { .await .is_none() { - handle_membership(&mut changes, room_id, event.into()); + handle_membership(&mut changes, room_id, &event.into()); } } } diff --git a/matrix_sdk_base/src/lib.rs b/matrix_sdk_base/src/lib.rs index 64e1877b..1a3f39ca 100644 --- a/matrix_sdk_base/src/lib.rs +++ b/matrix_sdk_base/src/lib.rs @@ -44,6 +44,7 @@ pub use matrix_sdk_common::*; mod client; mod error; +pub mod responses; mod session; mod store; diff --git a/matrix_sdk_base/src/responses.rs b/matrix_sdk_base/src/responses.rs new file mode 100644 index 00000000..73558e64 --- /dev/null +++ b/matrix_sdk_base/src/responses.rs @@ -0,0 +1,142 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +use matrix_sdk_common::{ + events::{AnySyncRoomEvent, AnySyncStateEvent}, + identifiers::RoomId, +}; + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct SyncResponse { + /// The batch token to supply in the `since` param of the next `/sync` request. + pub next_batch: String, + /// Updates to rooms. + pub rooms: Rooms, + ///// Updates to the presence status of other users. + //#[serde(default, skip_serializing_if = "Presence::is_empty")] + //pub presence: Presence, + + ///// The global private data created by this user. + //#[serde(default, skip_serializing_if = "AccountData::is_empty")] + //pub account_data: AccountData, + + ///// Messages sent dirrectly between devices. + //#[serde(default, skip_serializing_if = "ToDevice::is_empty")] + //pub to_device: ToDevice, + + ///// Information on E2E device updates. + ///// + ///// Only present on an incremental sync. + //#[serde(default, skip_serializing_if = "DeviceLists::is_empty")] + //pub device_lists: DeviceLists, + + ///// For each key algorithm, the number of unclaimed one-time keys + ///// currently held on the server for a device. + //#[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + //pub device_one_time_keys_count: BTreeMap, +} + +impl SyncResponse { + pub fn new(next_batch: String, rooms: Rooms) -> Self { + Self { next_batch, rooms } + } + + pub fn new_empty(next_batch: String) -> Self { + Self { + next_batch, + ..Default::default() + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct Rooms { + // /// The rooms that the user has left or been banned from. + // #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + // pub leave: BTreeMap, + /// The rooms that the user has joined. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub join: BTreeMap, + // /// The rooms that the user has been invited to. + // #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + // pub invite: BTreeMap, +} + +/// Updates to joined rooms. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct JoinedRoom { + // /// Information about the room which clients may need to correctly render it + // /// to users. + // #[serde(default, skip_serializing_if = "RoomSummary::is_empty")] + // pub summary: RoomSummary, + + // /// Counts of unread notifications for this room. + // #[serde(default, skip_serializing_if = "UnreadNotificationsCount::is_empty")] + // pub unread_notifications: UnreadNotificationsCount, + /// The timeline of messages and state changes in the room. + #[serde(default, skip_serializing_if = "Timeline::is_empty")] + pub timeline: Timeline, + + /// Updates to the state, between the time indicated by the `since` parameter, and the start + /// of the `timeline` (or all state up to the start of the `timeline`, if `since` is not + /// given, or `full_state` is true). + #[serde(default, skip_serializing_if = "State::is_empty")] + pub state: State, + // /// The private data that this user has attached to this room. + // #[serde(default, skip_serializing_if = "AccountData::is_empty")] + // pub account_data: AccountData, + + // /// The ephemeral events in the room that aren't recorded in the timeline or state of the + // /// room. e.g. typing. + // #[serde(default, skip_serializing_if = "Ephemeral::is_empty")] + // pub ephemeral: Ephemeral, +} + +impl JoinedRoom { + pub fn new(timeline: Timeline, state: State) -> Self { + Self { timeline, state } + } +} + +/// Events in the room. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct Timeline { + /// True if the number of events returned was limited by the `limit` on the filter. + #[serde(default)] + pub limited: bool, + + /// A token that can be supplied to to the `from` parameter of the + /// `/rooms/{roomId}/messages` endpoint. + #[serde(skip_serializing_if = "Option::is_none")] + pub prev_batch: Option, + + /// A list of events. + pub events: Vec, +} + +impl Timeline { + pub fn new(limited: bool, prev_batch: Option) -> Self { + Self { + limited, + prev_batch, + ..Default::default() + } + } + + fn is_empty(&self) -> bool { + self.events.is_empty() + } +} + +/// State events in the room. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct State { + /// A list of state events. + pub events: Vec, +} + +impl State { + fn is_empty(&self) -> bool { + self.events.is_empty() + } +} diff --git a/matrix_sdk_base/src/store.rs b/matrix_sdk_base/src/store.rs index 04fbc755..efd46d92 100644 --- a/matrix_sdk_base/src/store.rs +++ b/matrix_sdk_base/src/store.rs @@ -151,6 +151,7 @@ impl Room { encryption: None, summary: Default::default(), last_prev_batch: None, + members_synced: false, name: None, canonical_alias: None, avatar_url: None, @@ -158,8 +159,8 @@ impl Room { } } - pub async fn are_members_synced(&self) -> bool { - true + pub fn are_members_synced(&self) -> bool { + self.inner.lock().unwrap().members_synced } pub async fn get_j_members(&self) -> impl Stream + '_ { @@ -327,6 +328,7 @@ pub struct InnerSummary { avatar_url: Option, summary: SomeSummary, + members_synced: bool, encryption: Option, last_prev_batch: Option, @@ -345,6 +347,14 @@ impl InnerSummary { } } + pub fn mark_members_synced(&mut self) { + self.members_synced = true; + } + + pub fn mark_members_missing(&mut self) { + self.members_synced = false; + } + pub fn set_prev_batch(&mut self, prev_batch: Option<&str>) -> bool { if self.last_prev_batch.as_deref() != prev_batch { self.last_prev_batch = prev_batch.map(|p| p.to_string());