diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index cbb2a24f..4502f56a 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -14,25 +14,25 @@ // limitations under the License. use std::{ + collections::BTreeMap, convert::TryFrom, fmt, path::{Path, PathBuf}, result::Result as StdResult, sync::Arc, + time::SystemTime, }; use dashmap::DashMap; use futures::StreamExt; -#[cfg(feature = "encryption")] -use matrix_sdk_common::locks::Mutex; use matrix_sdk_common::{ api::r0 as api, events::{ room::member::MemberEventContent, AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent, - AnySyncStateEvent, StateEvent, SyncStateEvent, + AnySyncStateEvent, EventContent, StateEvent, }, - identifiers::{room_id, RoomId, UserId}, + identifiers::{RoomId, UserId}, locks::RwLock, Raw, }; @@ -44,21 +44,23 @@ use matrix_sdk_common::{ uuid::Uuid, }; #[cfg(feature = "encryption")] +use matrix_sdk_common::{events::presence::PresenceEvent, locks::Mutex}; +#[cfg(feature = "encryption")] use matrix_sdk_crypto::{ store::{CryptoStore, CryptoStoreError}, Device, EncryptionSettings, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas, ToDeviceRequest, UserDevices, }; -use tracing::{error, info, warn}; +use tracing::{info, warn}; use zeroize::Zeroizing; use crate::{ error::Result, responses::{ - AccountData, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, Presence, Rooms, - State, SyncResponse, Timeline, + AccountData, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, MemberEvent, + Presence, Rooms, State, StrippedMemberEvent, SyncResponse, Timeline, }, - rooms::{Room, RoomInfo, RoomType, StrippedRoom}, + rooms::{Room, RoomInfo, RoomType, StrippedRoom, StrippedRoomInfo}, session::Session, store::{StateChanges, Store}, }; @@ -148,27 +150,6 @@ fn hoist_room_event_prev_content( Ok(ev) } -fn handle_membership( - changes: &mut StateChanges, - room_id: &RoomId, - event: &SyncStateEvent, -) { - use matrix_sdk_common::events::room::member::MembershipState::*; - match &event.content.membership { - Join => { - info!("ADDING MEMBER {} to {}", event.state_key, room_id); - changes.add_joined_member(room_id, event.clone()) - // TODO check if the display name is - // ambigous - } - Invite => { - info!("ADDING INVITED MEMBER {} to {}", event.state_key, room_id); - changes.add_invited_member(room_id, event.clone()) - } - membership => info!("UNHANDLED MEMBERSHIP {} {:?}", event.state_key, membership), - } -} - /// Signals to the `BaseClient` which `RoomState` to send to `EventEmitter`. #[derive(Debug)] pub enum RoomStateType { @@ -471,18 +452,22 @@ impl BaseClient { async fn handle_timeline( &self, room_id: &RoomId, - ruma_timeline: &api::sync::sync_events::Timeline, + ruma_timeline: api::sync::sync_events::Timeline, summary: &mut RoomInfo, - mut changes: &mut StateChanges, + changes: &mut StateChanges, ) -> Timeline { let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone()); - for event in &ruma_timeline.events { - if let Ok(mut e) = hoist_room_event_prev_content(event) { + let mut members = BTreeMap::new(); + + for event in ruma_timeline.events { + if let Ok(mut e) = hoist_room_event_prev_content(&event) { match &mut e { AnySyncRoomEvent::State(s) => match s { AnySyncStateEvent::RoomMember(member) => { - handle_membership(&mut changes, room_id, member); + if let Ok(member) = MemberEvent::try_from(member.clone()) { + members.insert(member.state_key.clone(), member); + } } _ => { summary.handle_state_event(&s); @@ -517,39 +502,91 @@ impl BaseClient { timeline } - async fn handle_state( + fn handle_invited_state( &self, - room_id: &RoomId, - events: &[Raw], - summary: &mut RoomInfo, - mut changes: &mut StateChanges, - ) -> State { - let mut state = State::default(); + events: Vec>, + room_info: &mut StrippedRoomInfo, + ) -> ( + InviteState, + BTreeMap, + BTreeMap, + ) { + events.into_iter().fold( + (InviteState::default(), BTreeMap::new(), BTreeMap::new()), + |(mut state, mut members, mut state_events), e| { + match e.deserialize() { + Ok(e) => { + state.events.push(e.clone()); - for e in events { - match hoist_and_deserialize_state_event(e) { - Ok(event) => { - match &event { - AnySyncStateEvent::RoomMember(member) => { - handle_membership(&mut changes, room_id, member); - } - e => { - summary.handle_state_event(&e); - - changes.add_state_event(room_id, e.clone()); + if let AnyStrippedStateEvent::RoomMember(member) = e { + match StrippedMemberEvent::try_from(member) { + Ok(m) => { + members.insert(m.state_key.clone(), m); + } + Err(e) => warn!( + "Stripped member event in room {} has an invalid state key {:?}", + room_info.room_id, e + ), + } + } else { + room_info.handle_state_event(&e); + state_events.insert(e.content().event_type().to_owned(), e); } } - - state.events.push(event); + Err(err) => { + warn!( + "Couldn't deserialize stripped state event for room {}: {:?}", + room_info.room_id, err + ); + } } - Err(err) => warn!( - "Couldn't deserialize state event for room {}: {:?} {:#?}", - room_id, err, e - ), - } - } + (state, members, state_events) + }, + ) + } - state + fn handle_state( + &self, + events: Vec>, + room_info: &mut RoomInfo, + ) -> ( + State, + BTreeMap, + BTreeMap, + ) { + events.into_iter().fold( + (State::default(), BTreeMap::new(), BTreeMap::new()), + |(mut state, mut members, mut state_events), e| { + match hoist_and_deserialize_state_event(&e) { + Ok(e) => { + room_info.handle_state_event(&e); + state.events.push(e.clone()); + + if let AnySyncStateEvent::RoomMember(member) = e { + match MemberEvent::try_from(member) { + Ok(m) => { + members.insert(m.state_key.clone(), m); + } + Err(e) => warn!( + "Member event in room {} has an invalid state key {:?}", + room_info.room_id, e + ), + } + } else { + state_events.insert(e.content().event_type().to_owned(), e); + } + } + Err(err) => { + warn!( + "Couldn't deserialize state event for room {}: {:?} {:#?}", + room_info.room_id, err, e + ); + } + } + + (state, members, state_events) + }, + ) } async fn handle_room_account_data( @@ -584,6 +621,8 @@ impl BaseClient { return Ok(SyncResponse::new(response.next_batch)); } + let now = SystemTime::now(); + #[cfg(feature = "encryption")] { let olm = self.olm.lock().await; @@ -608,41 +647,38 @@ impl BaseClient { summary.update(&room_info.summary); summary.set_prev_batch(room_info.timeline.prev_batch.as_deref()); - let state = self - .handle_state( - &room_id, - &room_info.state.events, - &mut summary, - &mut changes, - ) - .await; + let (state, members, state_events) = + self.handle_state(room_info.state.events, &mut summary); + + changes.members.insert(room_id.clone(), members); + changes.state.insert(room_id.clone(), state_events); + + if room_info.timeline.limited { + summary.mark_members_missing(); + } let timeline = self - .handle_timeline(&room_id, &room_info.timeline, &mut summary, &mut changes) + .handle_timeline(&room_id, room_info.timeline, &mut summary, &mut changes) .await; let account_data = self .handle_room_account_data(&room_id, &room_info.account_data.events, &mut changes) .await; - #[cfg(feature = "encryption")] - if summary.is_encrypted() { - // TODO if the room isn't encrypted but the new summary is, - // add all the room users. - if let Some(o) = self.olm_machine().await { - if let Some(users) = changes.joined_user_ids.get(&room_id) { - o.update_tracked_users(users).await - } + // #[cfg(feature = "encryption")] + // if summary.is_encrypted() { + // // TODO if the room isn't encrypted but the new summary is, + // // add all the room users. + // if let Some(o) = self.olm_machine().await { + // if let Some(users) = changes.joined_user_ids.get(&room_id) { + // o.update_tracked_users(users).await + // } - if let Some(users) = changes.invited_user_ids.get(&room_id) { - o.update_tracked_users(users).await - } - } - } - - if room_info.timeline.limited { - summary.mark_members_missing(); - } + // if let Some(users) = changes.invited_user_ids.get(&room_id) { + // o.update_tracked_users(users).await + // } + // } + // } let notification_count = room_info.unread_notifications.into(); summary.update_notification_count(notification_count); @@ -670,17 +706,14 @@ impl BaseClient { let mut summary = room.clone_summary(); summary.mark_as_left(); - let state = self - .handle_state( - &room_id, - &room_info.state.events, - &mut summary, - &mut changes, - ) - .await; + let (state, members, state_events) = + self.handle_state(room_info.state.events, &mut summary); + + changes.members.insert(room_id.clone(), members); + changes.state.insert(room_id.clone(), state_events); let timeline = self - .handle_timeline(&room_id, &room_info.timeline, &mut summary, &mut changes) + .handle_timeline(&room_id, room_info.timeline, &mut summary, &mut changes) .await; let account_data = self @@ -700,26 +733,14 @@ impl BaseClient { changes.add_room(room_info); } - let mut state = InviteState::default(); - let room = self.get_or_create_stripped_room(&room_id).await; let mut room_info = room.clone_summary(); - for event in &invited.invite_state.events { - if let Ok(e) = event.deserialize() { - match &e { - AnyStrippedStateEvent::RoomMember(member) => { - changes.add_stripped_member(&room_id, member.clone()); - } - _ => { - room_info.handle_state_event(&e); - changes.add_stripped_state_event(&room_id, e.clone()); - } - } + let (state, members, state_events) = + self.handle_invited_state(invited.invite_state.events, &mut room_info); - state.events.push(e); - } - } + changes.stripped_members.insert(room_id.clone(), members); + changes.stripped_state.insert(room_id.clone(), state_events); let room = InvitedRoom { invite_state: state, @@ -728,22 +749,36 @@ impl BaseClient { rooms.invite.insert(room_id, room); } - for event in &response.presence.events { - if let Ok(e) = event.deserialize() { - changes.add_presence_event(e); - } - } + let presence: BTreeMap = response + .presence + .events + .into_iter() + .filter_map(|e| { + let event = e.deserialize().ok()?; + Some((event.sender.clone(), event)) + }) + .collect(); - for event in &response.account_data.events { - if let Ok(e) = event.deserialize() { - changes.add_account_data(e); - } - } + changes.presence = presence; + + let account_data: BTreeMap = response + .account_data + .events + .into_iter() + .filter_map(|e| { + let event = e.deserialize().ok()?; + Some((event.content().event_type().to_owned(), event)) + }) + .collect(); + + changes.account_data = account_data; self.store.save_changes(&changes).await; *self.sync_token.write().await = Some(response.next_batch.clone()); self.apply_changes(&changes).await; + info!("Processed a sync response in {:?}", now.elapsed().unwrap()); + Ok(SyncResponse { next_batch: response.next_batch, rooms, @@ -784,34 +819,47 @@ impl BaseClient { let mut changes = StateChanges::default(); - // TODO make sure we don't overwrite memership events from a sync. - for e in &response.chunk { - if let Ok(event) = hoist_member_event(e) { - if let Ok(user_id) = UserId::try_from(event.state_key.as_str()) { - if self - .store - .get_member_event(room_id, &user_id) - .await - .is_none() - { - handle_membership(&mut changes, room_id, &event.into()); + let members: BTreeMap = + futures::stream::iter(response.chunk.iter()) + .map(|e| { + hoist_member_event(e) + .ok() + .map(|e| MemberEvent::try_from(e).ok()) + .flatten() + }) + .filter_map(|e| async { + if let Some(e) = e { + if self + .store + .get_member_event(room_id, &e.state_key) + .await + .is_some() + { + Some((e.state_key.clone(), e)) + } else { + None + } + } else { + None } - } - } - } + }) + .collect() + .await; - #[cfg(feature = "encryption")] - if summary.is_encrypted() { - if let Some(o) = self.olm_machine().await { - if let Some(users) = changes.joined_user_ids.get(room_id) { - o.update_tracked_users(users).await - } + changes.members.insert(room_id.clone(), members); - if let Some(users) = changes.invited_user_ids.get(room_id) { - o.update_tracked_users(users).await - } - } - } + // #[cfg(feature = "encryption")] + // if summary.is_encrypted() { + // if let Some(o) = self.olm_machine().await { + // if let Some(users) = changes.joined_user_ids.get(room_id) { + // o.update_tracked_users(users).await + // } + + // if let Some(users) = changes.invited_user_ids.get(room_id) { + // o.update_tracked_users(users).await + // } + // } + // } changes.add_room(summary); diff --git a/matrix_sdk_base/src/responses.rs b/matrix_sdk_base/src/responses.rs index 345aec4f..5f3e7216 100644 --- a/matrix_sdk_base/src/responses.rs +++ b/matrix_sdk_base/src/responses.rs @@ -1,15 +1,16 @@ use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; +use std::{collections::BTreeMap, convert::TryFrom, time::SystemTime}; use matrix_sdk_common::{ api::r0::sync::sync_events::{ DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount, }, events::{ - presence::PresenceEvent, AnyBasicEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, - AnySyncRoomEvent, AnySyncStateEvent, AnyToDeviceEvent, + presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent, + AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent, + AnyToDeviceEvent, StateEvent, StrippedStateEvent, SyncStateEvent, Unsigned, }, - identifiers::{DeviceKeyAlgorithm, RoomId}, + identifiers::{DeviceKeyAlgorithm, EventId, RoomId, UserId}, }; #[derive(Clone, Debug, Default, Deserialize, Serialize)] @@ -204,3 +205,97 @@ pub struct State { /// A list of state events. pub events: Vec, } + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde( + try_from = "SyncStateEvent", + into = "SyncStateEvent" +)] +pub struct MemberEvent { + pub content: MemberEventContent, + pub event_id: EventId, + pub origin_server_ts: SystemTime, + pub prev_content: Option, + pub sender: UserId, + pub state_key: UserId, + pub unsigned: Unsigned, +} + +impl TryFrom> for MemberEvent { + type Error = matrix_sdk_common::identifiers::Error; + + fn try_from(event: SyncStateEvent) -> Result { + Ok(MemberEvent { + content: event.content, + event_id: event.event_id, + origin_server_ts: event.origin_server_ts, + prev_content: event.prev_content, + sender: event.sender, + state_key: UserId::try_from(event.state_key)?, + unsigned: event.unsigned, + }) + } +} + +impl TryFrom> for MemberEvent { + type Error = matrix_sdk_common::identifiers::Error; + + fn try_from(event: StateEvent) -> Result { + Ok(MemberEvent { + content: event.content, + event_id: event.event_id, + origin_server_ts: event.origin_server_ts, + prev_content: event.prev_content, + sender: event.sender, + state_key: UserId::try_from(event.state_key)?, + unsigned: event.unsigned, + }) + } +} + +impl Into> for MemberEvent { + fn into(self) -> SyncStateEvent { + SyncStateEvent { + content: self.content, + event_id: self.event_id, + sender: self.sender, + origin_server_ts: self.origin_server_ts, + state_key: self.state_key.to_string(), + prev_content: self.prev_content, + unsigned: self.unsigned, + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde( + try_from = "StrippedStateEvent", + into = "StrippedStateEvent" +)] +pub struct StrippedMemberEvent { + pub content: MemberEventContent, + pub sender: UserId, + pub state_key: UserId, +} + +impl TryFrom> for StrippedMemberEvent { + type Error = matrix_sdk_common::identifiers::Error; + + fn try_from(event: StrippedStateEvent) -> Result { + Ok(StrippedMemberEvent { + content: event.content, + sender: event.sender, + state_key: UserId::try_from(event.state_key)?, + }) + } +} + +impl Into> for StrippedMemberEvent { + fn into(self) -> StrippedStateEvent { + StrippedStateEvent { + content: self.content, + sender: self.sender, + state_key: self.state_key.to_string(), + } + } +} diff --git a/matrix_sdk_base/src/rooms/normal.rs b/matrix_sdk_base/src/rooms/normal.rs index ed56681c..f5fc5fc7 100644 --- a/matrix_sdk_base/src/rooms/normal.rs +++ b/matrix_sdk_base/src/rooms/normal.rs @@ -367,8 +367,4 @@ impl RoomInfo { changed } - - pub fn serialize(&self) -> Vec { - serde_json::to_vec(&self).unwrap() - } } diff --git a/matrix_sdk_base/src/rooms/stripped.rs b/matrix_sdk_base/src/rooms/stripped.rs index 07dcda0a..93306b64 100644 --- a/matrix_sdk_base/src/rooms/stripped.rs +++ b/matrix_sdk_base/src/rooms/stripped.rs @@ -12,22 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - convert::TryFrom, - sync::{Arc, Mutex as SyncMutex}, -}; +use std::sync::{Arc, Mutex as SyncMutex}; -use futures::stream::{Stream, StreamExt}; use matrix_sdk_common::{ - events::{room::encryption::EncryptionEventContent, AnyStrippedStateEvent, EventType}, + events::{room::encryption::EncryptionEventContent, AnyStrippedStateEvent}, identifiers::{RoomAliasId, RoomId, UserId}, }; use serde::{Deserialize, Serialize}; use crate::store::Store; -use super::RoomMember; - #[derive(Debug, Clone)] pub struct StrippedRoom { room_id: Arc, diff --git a/matrix_sdk_base/src/store.rs b/matrix_sdk_base/src/store.rs index 15cdb82b..c941dc75 100644 --- a/matrix_sdk_base/src/store.rs +++ b/matrix_sdk_base/src/store.rs @@ -1,10 +1,11 @@ -use std::{collections::BTreeMap, convert::TryFrom, path::Path}; +use std::{collections::BTreeMap, convert::TryFrom, path::Path, time::SystemTime}; use futures::stream::{self, Stream}; use matrix_sdk_common::{ events::{ - presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent, - AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType, StrippedStateEvent, + presence::PresenceEvent, + room::member::{MemberEventContent, MembershipState}, + AnyBasicEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType, SyncStateEvent, }, identifiers::{RoomId, UserId}, @@ -13,7 +14,11 @@ use matrix_sdk_common::{ use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree}; use tracing::info; -use crate::{rooms::RoomInfo, Session}; +use crate::{ + responses::{MemberEvent, StrippedMemberEvent}, + rooms::RoomInfo, + Session, +}; #[derive(Debug, Clone)] pub struct Store { @@ -35,58 +40,22 @@ pub struct Store { #[derive(Debug, Default)] pub struct StateChanges { pub session: Option, - pub members: BTreeMap>>, + pub members: BTreeMap>, pub state: BTreeMap>, pub account_data: BTreeMap, pub room_account_data: BTreeMap>, pub room_infos: BTreeMap, - pub joined_user_ids: BTreeMap>, - pub invited_user_ids: BTreeMap>, - pub removed_user_ids: BTreeMap, pub presence: BTreeMap, pub stripped_state: BTreeMap>, - pub stripped_members: - BTreeMap>>, + pub stripped_members: BTreeMap>, pub invited_room_info: BTreeMap, } impl StateChanges { - pub fn add_joined_member( - &mut self, - room_id: &RoomId, - event: SyncStateEvent, - ) { - let user_id = UserId::try_from(event.state_key.as_str()).unwrap(); - self.joined_user_ids - .entry(room_id.to_owned()) - .or_insert_with(Vec::new) - .push(user_id.clone()); - self.members - .entry(room_id.to_owned()) - .or_insert_with(BTreeMap::new) - .insert(user_id, event); - } - pub fn add_presence_event(&mut self, event: PresenceEvent) { self.presence.insert(event.sender.clone(), event); } - pub fn add_invited_member( - &mut self, - room_id: &RoomId, - event: SyncStateEvent, - ) { - let user_id = UserId::try_from(event.state_key.as_str()).unwrap(); - self.invited_user_ids - .entry(room_id.to_owned()) - .or_insert_with(Vec::new) - .push(user_id.clone()); - self.members - .entry(room_id.to_owned()) - .or_insert_with(BTreeMap::new) - .insert(user_id, event); - } - pub fn add_room(&mut self, room: RoomInfo) { self.room_infos .insert(room.room_id.as_ref().to_owned(), room); @@ -111,11 +80,7 @@ impl StateChanges { .insert(event.state_key().to_string(), event); } - pub fn add_stripped_member( - &mut self, - room_id: &RoomId, - event: StrippedStateEvent, - ) { + pub fn add_stripped_member(&mut self, room_id: &RoomId, event: StrippedMemberEvent) { let user_id = UserId::try_from(event.state_key.as_str()).unwrap(); self.stripped_members .entry(room_id.to_owned()) @@ -129,13 +94,6 @@ impl StateChanges { .or_insert_with(BTreeMap::new) .insert(event.content().event_type().to_string(), event); } - - pub fn from_event(room_id: &RoomId, event: SyncStateEvent) -> Self { - let mut changes = Self::default(); - changes.add_joined_member(room_id, event); - - changes - } } impl From for StateChanges { @@ -209,6 +167,8 @@ impl Store { } pub async fn save_changes(&self, changes: &StateChanges) { + let now = SystemTime::now(); + let ret: TransactionResult<()> = ( &self.session, &self.account_data, @@ -244,6 +204,23 @@ impl Store { for (room, events) in &changes.members { for (_, event) in events { + let key = format!("{}{}", room.as_str(), event.state_key.as_str()); + + match event.content.membership { + MembershipState::Join => { + joined.insert(key.as_str(), event.state_key.as_str())?; + invited.remove(key.as_str())?; + } + MembershipState::Invite => { + invited.insert(key.as_str(), event.state_key.as_str())?; + joined.remove(key.as_str())?; + } + _ => { + joined.remove(key.as_str())?; + invited.remove(key.as_str())?; + } + } + members.insert( format!("{}{}", room.as_str(), &event.state_key).as_str(), serde_json::to_vec(&event).unwrap(), @@ -265,24 +242,6 @@ impl Store { } } - for (room, users) in &changes.joined_user_ids { - for user in users { - let key = format!("{}{}", room.as_str(), user.as_str()); - info!("SAVING joined {}", &key); - joined.insert(key.as_bytes(), user.as_bytes())?; - invited.remove(key.as_bytes())?; - } - } - - for (room, users) in &changes.invited_user_ids { - for user in users { - let key = format!("{}{}", room.as_str(), user.as_str()); - info!("SAVING invited {}", &key); - invited.insert(key.as_bytes(), user.as_bytes())?; - joined.remove(key.as_bytes())?; - } - } - for (room, events) in &changes.state { for (_, event) in events { state.insert( @@ -299,7 +258,8 @@ impl Store { } for (room_id, summary) in &changes.room_infos { - summaries.insert(room_id.as_bytes(), summary.serialize())?; + summaries + .insert(room_id.as_bytes(), serde_json::to_vec(summary).unwrap())?; } for (sender, event) in &changes.presence { @@ -342,6 +302,8 @@ impl Store { ret.unwrap(); self.inner.flush_async().await.unwrap(); + + info!("Saved changes in {:?}", now.elapsed().unwrap()); } pub async fn get_presence_event(&self, user_id: &UserId) -> Option {