From cd3d90df3f1734df86b23cd5e18c8ecff28688dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 19 Oct 2020 19:11:12 +0200 Subject: [PATCH] base: Remove a bunch of stuff and add sled. --- matrix_sdk_base/Cargo.toml | 1 + matrix_sdk_base/src/client.rs | 1973 +---------------------- matrix_sdk_base/src/lib.rs | 15 - matrix_sdk_base/src/state/json_store.rs | 188 +-- matrix_sdk_base/src/state/mod.rs | 21 - 5 files changed, 39 insertions(+), 2159 deletions(-) diff --git a/matrix_sdk_base/Cargo.toml b/matrix_sdk_base/Cargo.toml index 7d8de457..720adf17 100644 --- a/matrix_sdk_base/Cargo.toml +++ b/matrix_sdk_base/Cargo.toml @@ -36,6 +36,7 @@ matrix-sdk-crypto = { version = "0.1.0", path = "../matrix_sdk_crypto", optional # Misc dependencies thiserror = "1.0.21" +sled = "*" [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] version = "0.2.22" diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index 6557a69b..3d39e7b8 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -14,26 +14,24 @@ // limitations under the License. use std::{ - collections::HashMap, fmt, - ops::Deref, path::{Path, PathBuf}, result::Result as StdResult, sync::Arc, }; +use sled::{self, Config as SledConfig, Db as Sled}; + #[cfg(feature = "encryption")] use matrix_sdk_common::locks::Mutex; use matrix_sdk_common::{ api::r0 as api, events::{ - direct::DirectEvent, ignored_user_list::IgnoredUserListEvent, push_rules::PushRulesEvent, - room::member::MemberEventContent, AnyBasicEvent, AnyStrippedStateEvent, - AnySyncEphemeralRoomEvent, AnySyncMessageEvent, AnySyncRoomEvent, AnySyncStateEvent, + room::member::MemberEventContent, AnyStrippedStateEvent, AnySyncRoomEvent, + AnySyncStateEvent, }, identifiers::{RoomId, UserId}, locks::RwLock, - push::Ruleset, Raw, }; #[cfg(feature = "encryption")] @@ -51,18 +49,7 @@ use matrix_sdk_crypto::{ }; use zeroize::Zeroizing; -#[cfg(not(target_arch = "wasm32"))] -use crate::JsonStore; - -use crate::{ - error::Result, - event_emitter::CustomEvent, - events::presence::PresenceEvent, - models::Room, - session::Session, - state::{AllRooms, ClientState, StateStore}, - EventEmitter, -}; +use crate::{error::Result, session::Session}; pub type Token = String; @@ -182,25 +169,8 @@ pub struct BaseClient { session: Arc>>, /// The current sync token that should be used for the next sync call. pub(crate) sync_token: Arc>>, - /// A map of the rooms our user is joined in. - joined_rooms: Arc>>>>, - /// A map of the rooms our user is invited to. - invited_rooms: Arc>>>>, - /// A map of the rooms our user has left. - left_rooms: Arc>>>>, - /// A list of ignored users. - pub(crate) ignored_users: Arc>>, - /// The push ruleset for the logged in user. - pub(crate) push_ruleset: Arc>>, - /// Any implementor of EventEmitter will act as the callbacks for various - /// events. - event_emitter: Arc>>>, - /// Any implementor of `StateStore` will be called to save `Room` and - /// some `BaseClient` state after receiving a sync response. - /// - /// There is a default implementation `JsonStore` that saves JSON to disk. - state_store: Arc>>>, - + /// Database + sled: Sled, #[cfg(feature = "encryption")] olm: Arc>>, #[cfg(feature = "encryption")] @@ -215,10 +185,6 @@ impl fmt::Debug for BaseClient { f.debug_struct("Client") .field("session", &self.session) .field("sync_token", &self.sync_token) - .field("joined_rooms", &self.joined_rooms) - .field("ignored_users", &self.ignored_users) - .field("push_ruleset", &self.push_ruleset) - .field("event_emitter", &"EventEmitter<...>") .finish() } } @@ -236,7 +202,6 @@ impl fmt::Debug for BaseClient { /// ``` #[derive(Default)] pub struct BaseClientConfig { - state_store: Option>, #[cfg(feature = "encryption")] crypto_store: Option>, store_path: Option, @@ -256,14 +221,6 @@ impl BaseClientConfig { Default::default() } - /// Set a custom implementation of a `StateStore`. - /// - /// The state store should be opened before being set. - pub fn state_store(mut self, store: Box) -> Self { - self.state_store = Some(store); - self - } - /// Set a custom implementation of a `CryptoStore`. /// /// The crypto store should be opened before being set. @@ -319,13 +276,7 @@ impl BaseClient { Ok(BaseClient { session: Arc::new(RwLock::new(None)), sync_token: Arc::new(RwLock::new(None)), - joined_rooms: Arc::new(RwLock::new(HashMap::new())), - invited_rooms: Arc::new(RwLock::new(HashMap::new())), - left_rooms: Arc::new(RwLock::new(HashMap::new())), - ignored_users: Arc::new(RwLock::new(Vec::new())), - push_ruleset: Arc::new(RwLock::new(None)), - event_emitter: Arc::new(RwLock::new(None)), - state_store: Arc::new(RwLock::new(config.state_store)), + sled: SledConfig::new().temporary(true).open().unwrap(), #[cfg(feature = "encryption")] olm: Arc::new(Mutex::new(None)), #[cfg(feature = "encryption")] @@ -352,91 +303,6 @@ impl BaseClient { self.session.read().await.is_some() } - /// Add `EventEmitter` to `Client`. - /// - /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. - pub async fn add_event_emitter(&self, emitter: Box) { - *self.event_emitter.write().await = Some(emitter); - } - - /// When a client is provided the state store will load state from the `StateStore`. - /// - /// Returns `true` when a state store sync has successfully completed. - async fn sync_with_state_store(&self, session: &Session) -> Result { - let store = self.state_store.read().await; - - let loaded = if let Some(store) = store.as_ref() { - if let Some(client_state) = store.load_client_state(session).await? { - let ClientState { - sync_token, - ignored_users, - push_ruleset, - } = client_state; - *self.sync_token.write().await = sync_token; - *self.ignored_users.write().await = ignored_users; - *self.push_ruleset.write().await = push_ruleset; - } else { - // return false and continues with a sync request then save the state and create - // and populate the files during the sync - return Ok(false); - } - - let AllRooms { - mut joined, - mut invited, - mut left, - } = store.load_all_rooms().await?; - - *self.joined_rooms.write().await = joined - .drain() - .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) - .collect(); - - *self.invited_rooms.write().await = invited - .drain() - .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) - .collect(); - - *self.left_rooms.write().await = left - .drain() - .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) - .collect(); - - true - } else { - false - }; - - Ok(loaded) - } - - /// When a client is provided the state store will load state from the `StateStore`. - /// - /// Returns `true` when a state store sync has successfully completed. - pub async fn store_room_state(&self, room_id: &RoomId) -> Result<()> { - if let Some(store) = self.state_store.read().await.as_ref() { - if let Some(room) = self.get_joined_room(room_id).await { - let room = room.read().await; - store - .store_room_state(RoomState::Joined(room.deref())) - .await?; - } - if let Some(room) = self.get_invited_room(room_id).await { - let room = room.read().await; - store - .store_room_state(RoomState::Invited(room.deref())) - .await?; - } - if let Some(room) = self.get_left_room(room_id).await { - let room = room.read().await; - store - .store_room_state(RoomState::Left(room.deref())) - .await?; - } - } - Ok(()) - } - /// Receive a login response and update the session of the client. /// /// # Arguments @@ -464,15 +330,15 @@ impl BaseClient { pub async fn restore_login(&self, session: Session) -> Result<()> { // If there wasn't a state store opened, try to open the default one if // a store path was provided. - if self.state_store.read().await.is_none() { - #[cfg(not(target_arch = "wasm32"))] - if let Some(path) = &*self.store_path { - let store = JsonStore::open(path)?; - *self.state_store.write().await = Some(Box::new(store)); - } - } + // if self.state_store.read().await.is_none() { + // #[cfg(not(target_arch = "wasm32"))] + // if let Some(path) = &*self.store_path { + // let store = JsonStore::open(path)?; + // *self.state_store.write().await = Some(Box::new(store)); + // } + // } - self.sync_with_state_store(&session).await?; + // self.sync_with_state_store(&session).await?; #[cfg(feature = "encryption")] { @@ -517,438 +383,6 @@ impl BaseClient { Ok(()) } - pub(crate) async fn get_or_create_joined_room( - &self, - room_id: &RoomId, - ) -> Result>> { - // If this used to be an invited or left room remove them from our other - // hashmaps. - if self.invited_rooms.write().await.remove(room_id).is_some() { - if let Some(store) = self.state_store.read().await.as_ref() { - store.delete_room_state(RoomState::Invited(room_id)).await?; - } - } - - if self.left_rooms.write().await.remove(room_id).is_some() { - if let Some(store) = self.state_store.read().await.as_ref() { - store.delete_room_state(RoomState::Left(room_id)).await?; - } - } - - let mut rooms = self.joined_rooms.write().await; - #[allow(clippy::or_fun_call)] - Ok(rooms - .entry(room_id.clone()) - .or_insert(Arc::new(RwLock::new(Room::new( - room_id, - &self - .session - .read() - .await - .as_ref() - .expect("Receiving events while not being logged in") - .user_id, - )))) - .clone()) - } - - /// Get a joined room with the given room id. - /// - /// # Arguments - /// - /// `room_id` - The unique id of the room that should be fetched. - pub async fn get_joined_room(&self, room_id: &RoomId) -> Option>> { - self.joined_rooms.read().await.get(room_id).cloned() - } - - /// Returns the joined rooms this client knows about. - /// - /// A `HashMap` of room id to `matrix::models::Room` - pub fn joined_rooms(&self) -> Arc>>>> { - self.joined_rooms.clone() - } - - pub(crate) async fn get_or_create_invited_room( - &self, - room_id: &RoomId, - ) -> Result>> { - // Remove the left rooms only here, since a join -> invite action per - // spec can't happen. - if self.left_rooms.write().await.remove(room_id).is_some() { - if let Some(store) = self.state_store.read().await.as_ref() { - store.delete_room_state(RoomState::Left(room_id)).await?; - } - } - - let mut rooms = self.invited_rooms.write().await; - #[allow(clippy::or_fun_call)] - Ok(rooms - .entry(room_id.clone()) - .or_insert(Arc::new(RwLock::new(Room::new( - room_id, - &self - .session - .read() - .await - .as_ref() - .expect("Receiving events while not being logged in") - .user_id, - )))) - .clone()) - } - - /// Get an invited room with the given room id. - /// - /// # Arguments - /// - /// `room_id` - The unique id of the room that should be fetched. - pub async fn get_invited_room(&self, room_id: &RoomId) -> Option>> { - self.invited_rooms.read().await.get(room_id).cloned() - } - - /// Returns the invited rooms this client knows about. - /// - /// A `HashMap` of room id to `matrix::models::Room` - pub fn invited_rooms(&self) -> Arc>>>> { - self.invited_rooms.clone() - } - - pub(crate) async fn get_or_create_left_room( - &self, - room_id: &RoomId, - ) -> Result>> { - // If this used to be an invited or joined room remove them from our other - // hashmaps. - if self.invited_rooms.write().await.remove(room_id).is_some() { - if let Some(store) = self.state_store.read().await.as_ref() { - store.delete_room_state(RoomState::Invited(room_id)).await?; - } - } - - if self.joined_rooms.write().await.remove(room_id).is_some() { - if let Some(store) = self.state_store.read().await.as_ref() { - store.delete_room_state(RoomState::Joined(room_id)).await?; - } - } - - let mut rooms = self.left_rooms.write().await; - #[allow(clippy::or_fun_call)] - Ok(rooms - .entry(room_id.clone()) - .or_insert(Arc::new(RwLock::new(Room::new( - room_id, - &self - .session - .read() - .await - .as_ref() - .expect("Receiving events while not being logged in") - .user_id, - )))) - .clone()) - } - - /// Get an left room with the given room id. - /// - /// # Arguments - /// - /// `room_id` - The unique id of the room that should be fetched. - pub async fn get_left_room(&self, room_id: &RoomId) -> Option>> { - self.left_rooms.read().await.get(room_id).cloned() - } - - /// Returns the left rooms this client knows about. - /// - /// A `HashMap` of room id to `matrix::models::Room` - pub fn left_rooms(&self) -> Arc>>>> { - self.left_rooms.clone() - } - - /// Handle a m.ignored_user_list event, updating the room state if necessary. - /// - /// Returns true if the room name changed, false otherwise. - pub(crate) async fn handle_ignored_users(&self, event: &IgnoredUserListEvent) -> bool { - // this avoids cloning every UserId for the eq check - if self.ignored_users.read().await.iter().collect::>() - == event.content.ignored_users.iter().collect::>() - { - false - } else { - *self.ignored_users.write().await = event.content.ignored_users.to_vec(); - true - } - } - - /// Handle a m.ignored_user_list event, updating the room state if necessary. - /// - /// Returns true if the room name changed, false otherwise. - pub(crate) async fn handle_push_rules(&self, event: &PushRulesEvent) -> bool { - // TODO this is basically a stub - // TODO ruma removed PartialEq for evens, so this doesn't work anymore. - // Returning always true for now should be ok here since those don't - // change often. - // if self.push_ruleset.as_ref() == Some(&event.content.global) { - // false - // } else { - *self.push_ruleset.write().await = Some(event.content.global.clone()); - true - // } - } - - /// Handle a m.direct event, updating rooms states if necessary. - /// - /// Returns true if any room changed, false otherwise. - pub(crate) async fn handle_direct(&self, event: &DirectEvent) -> Vec>> { - let mut updated_rooms = vec![]; - - for (user_id, rooms) in event.content.iter() { - for room_id in rooms.iter() { - if let Some(room) = &self.get_joined_room(room_id).await { - let mut room_locked = room.write().await; - if room_locked.handle_direct(user_id) { - updated_rooms.push(room.to_owned()); - } - } - } - } - updated_rooms - } - - /// Receive a timeline event for a joined room and update the client state. - /// - /// Returns a bool, true when the `Room` state has been updated. - /// - /// This will in-place replace the event with a decrypted one if the - /// encryption feature is turned on, the event is encrypted and if we - /// successfully decrypted the event. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - The event that should be handled by the client. - pub async fn receive_joined_timeline_event( - &self, - room_id: &RoomId, - event: &mut Raw, - ) -> Result { - match event.deserialize() { - #[allow(unused_mut)] - Ok(mut e) => { - #[cfg(feature = "encryption")] - if let AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomEncrypted( - ref mut encrypted_event, - )) = e - { - let olm = self.olm.lock().await; - - if let Some(o) = &*olm { - if let Ok(decrypted) = o.decrypt_room_event(&encrypted_event, room_id).await - { - if let Ok(d) = decrypted.deserialize() { - e = d - } - *event = decrypted; - } - } - } - - let room_lock = self.get_or_create_joined_room(&room_id).await?; - let mut room = room_lock.write().await; - - if let AnySyncRoomEvent::State(AnySyncStateEvent::RoomMember(mem_event)) = &mut e { - let (changed, _) = room.handle_membership(mem_event, false); - - // The memberlist of the room changed, invalidate the group session - // of the room. - if changed { - #[cfg(feature = "encryption")] - self.invalidate_group_session(room_id).await; - } - - Ok(changed) - } else { - Ok(room.receive_timeline_event(&e)) - } - } - _ => Ok(false), - } - } - - /// Receive a state event for a joined room and update the client state. - /// - /// Returns true if the state of the room changed, false - /// otherwise. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - The event that should be handled by the client. - pub async fn receive_joined_state_event( - &self, - room_id: &RoomId, - event: &AnySyncStateEvent, - ) -> Result { - let room_lock = self.get_or_create_joined_room(room_id).await?; - let mut room = room_lock.write().await; - - if let AnySyncStateEvent::RoomMember(e) = event { - let (changed, _) = room.handle_membership(e, true); - - // The memberlist of the room changed, invalidate the group session - // of the room. - if changed { - #[cfg(feature = "encryption")] - self.invalidate_group_session(room_id).await; - } - - Ok(changed) - } else { - Ok(room.receive_state_event(event)) - } - } - - /// Receive a state event for a room the user has been invited to. - /// - /// Returns true if the state of the room changed, false - /// otherwise. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - A `AnyStrippedStateEvent` that should be handled by the client. - pub async fn receive_invite_state_event( - &self, - room_id: &RoomId, - event: &AnyStrippedStateEvent, - ) -> Result { - let room_lock = self.get_or_create_invited_room(room_id).await?; - let mut room = room_lock.write().await; - Ok(room.receive_stripped_state_event(event)) - } - - /// Receive a timeline event for a room the user has left and update the client state. - /// - /// Returns a tuple of the successfully decrypted event, or None on failure and - /// a bool, true when the `Room` state has been updated. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - The event that should be handled by the client. - pub async fn receive_left_timeline_event( - &self, - room_id: &RoomId, - event: &Raw, - ) -> Result { - match event.deserialize() { - Ok(e) => { - let room_lock = self.get_or_create_left_room(room_id).await?; - let mut room = room_lock.write().await; - Ok(room.receive_timeline_event(&e)) - } - _ => Ok(false), - } - } - - /// Receive a state event for a room the user has left and update the client state. - /// - /// Returns true if the state of the room changed, false - /// otherwise. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - The event that should be handled by the client. - pub async fn receive_left_state_event( - &self, - room_id: &RoomId, - event: &AnySyncStateEvent, - ) -> Result { - let room_lock = self.get_or_create_left_room(room_id).await?; - let mut room = room_lock.write().await; - Ok(room.receive_state_event(event)) - } - - /// Receive a presence event from a sync response and updates the client state. - /// - /// Returns true if the state of the room changed, false - /// otherwise. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - The event that should be handled by the client. - pub async fn receive_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) -> bool { - // this should be the room that was just created in the `Client::sync` loop. - if let Some(room) = self.get_joined_room(room_id).await { - let mut room = room.write().await; - room.receive_presence_event(event) - } else { - false - } - } - - /// Receive an account data event associated to a room from a sync - /// response and updates the client state. - /// - /// Returns true if the state of the `Room` has changed, false otherwise. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - The presence event for a specified room member. - pub async fn receive_room_account_data_event(&self, _: &RoomId, event: &AnyBasicEvent) -> bool { - match event { - AnyBasicEvent::IgnoredUserList(event) => self.handle_ignored_users(event).await, - AnyBasicEvent::PushRules(event) => self.handle_push_rules(event).await, - _ => false, - } - } - - /// Receive an account data event from a sync response and updates - /// the client state. - /// - /// Returns true if the state of any room has changed, false otherwise. - /// - /// # Arguments - /// - /// * `event` - The presence event for a specified room member. - pub async fn receive_account_data_event( - &self, - event: &AnyBasicEvent, - ) -> Vec>> { - match event { - AnyBasicEvent::Direct(event) => self.handle_direct(event).await, - _ => vec![], - } - } - - /// Receive an ephemeral event from a sync response and updates the client state. - /// - /// Returns true if the state of the `Room` has changed, false otherwise. - /// - /// # Arguments - /// - /// * `room_id` - The unique id of the room the event belongs to. - /// - /// * `event` - The presence event for a specified room member. - pub async fn receive_ephemeral_event(&self, event: &AnySyncEphemeralRoomEvent) -> bool { - match event { - AnySyncEphemeralRoomEvent::FullyRead(_) => {} - AnySyncEphemeralRoomEvent::Receipt(_) => {} - AnySyncEphemeralRoomEvent::Typing(_) => {} - _ => {} - }; - false - } - /// Get the current, if any, sync token of the client. /// This will be None if the client didn't sync at least once. pub async fn sync_token(&self) -> Option { @@ -986,306 +420,9 @@ impl BaseClient { } } - // when events change state, updated_* signals to StateStore to update database - self.iter_joined_rooms(response).await?; - self.iter_invited_rooms(response).await?; - self.iter_left_rooms(response).await?; - self.iter_account_data(response).await?; - - let store = self.state_store.read().await; - - // Store now the new sync token an other client specific state. Since we - // know the sync token changed we can assume that this needs to be done - // always. - if let Some(store) = store.as_ref() { - let state = ClientState::from_base_client(&self).await; - store.store_client_state(state).await?; - } - Ok(()) } - async fn iter_joined_rooms( - &self, - response: &mut api::sync::sync_events::Response, - ) -> Result { - let mut updated = false; - for (room_id, joined_room) in &mut response.rooms.join { - let matrix_room = { - for event in &mut joined_room.state.events { - // XXX: Related to `prev_content` and `unsigned`; see the doc comment of - // `hoist_room_event_prev_content` - if let Some(e) = hoist_state_event_prev_content(event) { - *event = e; - } - - if let Ok(e) = event.deserialize() { - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - if self.receive_joined_state_event(&room_id, &e).await? { - updated = true; - } - self.emit_state_event(&room_id, &e, RoomStateType::Joined) - .await; - } - } - - self.get_or_create_joined_room(&room_id).await?.clone() - }; - - // RoomSummary contains information for calculating room name. - matrix_room - .write() - .await - .set_room_summary(&joined_room.summary); - - // Set unread notification count. - matrix_room - .write() - .await - .set_unread_notice_count(&joined_room.unread_notifications); - - for mut event in &mut joined_room.timeline.events { - // XXX: Related to `prev_content` and `unsigned`; see the doc comment of - // `hoist_room_event_prev_content` - if let Some(e) = hoist_room_event_prev_content(event) { - *event = e; - } - - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - let timeline_update = self - .receive_joined_timeline_event(room_id, &mut event) - .await?; - if timeline_update { - updated = true; - }; - - if let Ok(e) = event.deserialize() { - self.emit_timeline_event(&room_id, &e, RoomStateType::Joined) - .await; - } else { - self.emit_unrecognized_event(&room_id, &event, RoomStateType::Joined) - .await; - } - } - - #[cfg(feature = "encryption")] - { - let olm = self.olm.lock().await; - - if let Some(o) = &*olm { - let room = matrix_room.read().await; - - // If the room is encrypted, update the tracked users. - if room.is_encrypted() { - o.update_tracked_users(room.joined_members.keys()).await; - o.update_tracked_users(room.invited_members.keys()).await; - } - } - } - - // look at AccountData to further cut down users by collecting ignored users - for account_data in &joined_room.account_data.events { - { - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - if let Ok(e) = account_data.deserialize() { - if self.receive_room_account_data_event(&room_id, &e).await { - updated = true; - } - self.emit_account_data_event(room_id, &e, RoomStateType::Joined) - .await; - } - } - } - - // After the room has been created and state/timeline events accounted for we use the room_id of the newly created - // room to add any presence events that relate to a user in the current room. This is not super - // efficient but we need a room_id so we would loop through now or later. - for presence in &mut response.presence.events { - { - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - if let Ok(e) = presence.deserialize() { - if self.receive_presence_event(&room_id, &e).await { - updated = true; - } - - self.emit_presence_event(&room_id, &e, RoomStateType::Joined) - .await; - } - } - } - - for ephemeral in &mut joined_room.ephemeral.events { - { - if let Ok(e) = ephemeral.deserialize() { - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - if self.receive_ephemeral_event(&e).await { - updated = true; - } - - self.emit_ephemeral_event(&room_id, &e, RoomStateType::Joined) - .await; - } - } - } - - if updated { - if let Some(store) = self.state_store.read().await.as_ref() { - store - .store_room_state(RoomState::Joined(matrix_room.read().await.deref())) - .await?; - } - } - } - Ok(updated) - } - - async fn iter_left_rooms( - &self, - response: &mut api::sync::sync_events::Response, - ) -> Result { - let mut updated = false; - for (room_id, left_room) in &mut response.rooms.leave { - let matrix_room = { - for event in &mut left_room.state.events { - // XXX: Related to `prev_content` and `unsigned`; see the doc comment of - // `hoist_room_event_prev_content` - if let Some(e) = hoist_state_event_prev_content(event) { - *event = e; - } - - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - if let Ok(e) = event.deserialize() { - if self.receive_left_state_event(&room_id, &e).await? { - updated = true; - } - } - } - - self.get_or_create_left_room(&room_id).await?.clone() - }; - - for event in &mut left_room.state.events { - if let Ok(e) = event.deserialize() { - self.emit_state_event(&room_id, &e, RoomStateType::Left) - .await; - } - } - - for event in &mut left_room.timeline.events { - // XXX: Related to `prev_content` and `unsigned`; see the doc comment of - // `hoist_room_event_prev_content` - if let Some(e) = hoist_room_event_prev_content(event) { - *event = e; - } - - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - if self.receive_left_timeline_event(room_id, &event).await? { - updated = true; - }; - - if let Ok(e) = event.deserialize() { - self.emit_timeline_event(&room_id, &e, RoomStateType::Left) - .await; - } - } - - if updated { - if let Some(store) = self.state_store.read().await.as_ref() { - store - .store_room_state(RoomState::Left(matrix_room.read().await.deref())) - .await?; - } - } - } - Ok(updated) - } - - async fn iter_account_data( - &self, - response: &mut api::sync::sync_events::Response, - ) -> Result { - let mut updated = false; - for account_data in &response.account_data.events { - { - // FIXME: emit_account_data_event assumes a room is given - if let Ok(e) = account_data.deserialize() { - for room in self.receive_account_data_event(&e).await { - if let Some(store) = self.state_store.read().await.as_ref() { - // FIXME: currently only operate on Joined rooms - store - .store_room_state(RoomState::Joined(room.read().await.deref())) - .await?; - } - updated = true; - } - } - } - } - Ok(updated) - } - - async fn iter_invited_rooms( - &self, - response: &api::sync::sync_events::Response, - ) -> Result { - let mut updated = false; - for (room_id, invited_room) in &response.rooms.invite { - let matrix_room = { - for event in &invited_room.invite_state.events { - if let Ok(e) = event.deserialize() { - // FIXME: receive_* and emit_* methods shouldn't be called in parallel. We - // should only pass events to receive_* methods and then let *them* emit. - if self.receive_invite_state_event(&room_id, &e).await? { - updated = true; - } - } - } - - self.get_or_create_invited_room(&room_id).await?.clone() - }; - - for event in &invited_room.invite_state.events { - if let Ok(mut e) = event.deserialize() { - // if the event is a m.room.member event the server will sometimes - // send the `prev_content` field as part of the unsigned field. - if let AnyStrippedStateEvent::RoomMember(_) = &mut e { - if let Some(raw_content) = stripped_deserialize_prev_content(event) { - let prev_content = raw_content - .prev_content - .and_then(|json| json.deserialize().ok()); - self.emit_stripped_state_event( - &room_id, - &e, - prev_content, - RoomStateType::Invited, - ) - .await; - continue; - } - } - self.emit_stripped_state_event(&room_id, &e, None, RoomStateType::Invited) - .await; - } - } - - if updated { - if let Some(store) = self.state_store.read().await.as_ref() { - store - .store_room_state(RoomState::Invited(matrix_room.read().await.deref())) - .await?; - } - } - } - Ok(updated) - } - /// Should the client share a group session for the given room. /// /// Returns true if a session needs to be shared before room messages can be @@ -1368,27 +505,28 @@ impl BaseClient { #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub async fn share_group_session(&self, room_id: &RoomId) -> Result>> { - let room = self.get_joined_room(room_id).await.expect("No room found"); - let olm = self.olm.lock().await; + todo!() + // let room = self.get_joined_room(room_id).await.expect("No room found"); + // let olm = self.olm.lock().await; - match &*olm { - Some(o) => { - let room = room.write().await; + // match &*olm { + // Some(o) => { + // let room = room.write().await; - // XXX: We construct members in a slightly roundabout way instead of chaining the - // iterators directly because of https://github.com/rust-lang/rust/issues/64552 - let joined_members = room.joined_members.keys(); - let invited_members = room.joined_members.keys(); - let members: Vec<&UserId> = joined_members.chain(invited_members).collect(); - Ok(o.share_group_session( - room_id, - members.into_iter(), - room.encrypted.clone().unwrap_or_default(), - ) - .await?) - } - None => panic!("Olm machine wasn't started"), - } + // // XXX: We construct members in a slightly roundabout way instead of chaining the + // // iterators directly because of https://github.com/rust-lang/rust/issues/64552 + // let joined_members = room.joined_members.keys(); + // let invited_members = room.joined_members.keys(); + // let members: Vec<&UserId> = joined_members.chain(invited_members).collect(); + // Ok(o.share_group_session( + // room_id, + // members.into_iter(), + // room.encrypted.clone().unwrap_or_default(), + // ) + // .await?) + // } + // None => panic!("Olm machine wasn't started"), + // } } /// Encrypt a message event content. @@ -1423,406 +561,6 @@ impl BaseClient { } } - pub(crate) async fn emit_timeline_event( - &self, - room_id: &RoomId, - event: &AnySyncRoomEvent, - room_state: RoomStateType, - ) { - let lock = self.event_emitter.read().await; - let event_emitter = if let Some(ee) = lock.as_ref() { - ee - } else { - return; - }; - - let room = match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id).await { - RoomState::Invited(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id).await { - RoomState::Joined(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id).await { - RoomState::Left(Arc::clone(&room)) - } else { - return; - } - } - }; - - match event { - AnySyncRoomEvent::State(event) => match event { - AnySyncStateEvent::RoomMember(e) => event_emitter.on_room_member(room, e).await, - AnySyncStateEvent::RoomName(e) => event_emitter.on_room_name(room, e).await, - AnySyncStateEvent::RoomCanonicalAlias(e) => { - event_emitter.on_room_canonical_alias(room, e).await - } - AnySyncStateEvent::RoomAliases(e) => event_emitter.on_room_aliases(room, e).await, - AnySyncStateEvent::RoomAvatar(e) => event_emitter.on_room_avatar(room, e).await, - AnySyncStateEvent::RoomPowerLevels(e) => { - event_emitter.on_room_power_levels(room, e).await - } - AnySyncStateEvent::RoomTombstone(e) => { - event_emitter.on_room_tombstone(room, e).await - } - AnySyncStateEvent::RoomJoinRules(e) => { - event_emitter.on_room_join_rules(room, e).await - } - AnySyncStateEvent::Custom(e) => { - event_emitter - .on_custom_event(room, &CustomEvent::State(e)) - .await - } - _ => {} - }, - AnySyncRoomEvent::Message(event) => match event { - AnySyncMessageEvent::RoomMessage(e) => event_emitter.on_room_message(room, e).await, - AnySyncMessageEvent::RoomMessageFeedback(e) => { - event_emitter.on_room_message_feedback(room, e).await - } - AnySyncMessageEvent::RoomRedaction(e) => { - event_emitter.on_room_redaction(room, e).await - } - AnySyncMessageEvent::Custom(e) => { - event_emitter - .on_custom_event(room, &CustomEvent::Message(e)) - .await - } - _ => {} - }, - AnySyncRoomEvent::RedactedState(_event) => {} - AnySyncRoomEvent::RedactedMessage(_event) => {} - } - } - - pub(crate) async fn emit_state_event( - &self, - room_id: &RoomId, - event: &AnySyncStateEvent, - room_state: RoomStateType, - ) { - let lock = self.event_emitter.read().await; - let event_emitter = if let Some(ee) = lock.as_ref() { - ee - } else { - return; - }; - - let room = match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id).await { - RoomState::Invited(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id).await { - RoomState::Joined(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id).await { - RoomState::Left(Arc::clone(&room)) - } else { - return; - } - } - }; - - match event { - AnySyncStateEvent::RoomMember(member) => { - event_emitter.on_state_member(room, &member).await - } - AnySyncStateEvent::RoomName(name) => event_emitter.on_state_name(room, &name).await, - AnySyncStateEvent::RoomCanonicalAlias(canonical) => { - event_emitter - .on_state_canonical_alias(room, &canonical) - .await - } - AnySyncStateEvent::RoomAliases(aliases) => { - event_emitter.on_state_aliases(room, &aliases).await - } - AnySyncStateEvent::RoomAvatar(avatar) => { - event_emitter.on_state_avatar(room, &avatar).await - } - AnySyncStateEvent::RoomPowerLevels(power) => { - event_emitter.on_state_power_levels(room, &power).await - } - AnySyncStateEvent::RoomJoinRules(rules) => { - event_emitter.on_state_join_rules(room, &rules).await - } - AnySyncStateEvent::RoomTombstone(tomb) => { - // TODO make `on_state_tombstone` method - event_emitter.on_room_tombstone(room, &tomb).await - } - AnySyncStateEvent::Custom(custom) => { - event_emitter - .on_custom_event(room, &CustomEvent::State(custom)) - .await - } - _ => {} - } - } - - pub(crate) async fn emit_stripped_state_event( - &self, - room_id: &RoomId, - event: &AnyStrippedStateEvent, - prev_content: Option, - room_state: RoomStateType, - ) { - let lock = self.event_emitter.read().await; - let event_emitter = if let Some(ee) = lock.as_ref() { - ee - } else { - return; - }; - - let room = match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id).await { - RoomState::Invited(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id).await { - RoomState::Joined(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id).await { - RoomState::Left(Arc::clone(&room)) - } else { - return; - } - } - }; - - match event { - AnyStrippedStateEvent::RoomMember(member) => { - event_emitter - .on_stripped_state_member(room, &member, prev_content) - .await - } - AnyStrippedStateEvent::RoomName(name) => { - event_emitter.on_stripped_state_name(room, &name).await - } - AnyStrippedStateEvent::RoomCanonicalAlias(canonical) => { - event_emitter - .on_stripped_state_canonical_alias(room, &canonical) - .await - } - AnyStrippedStateEvent::RoomAliases(aliases) => { - event_emitter - .on_stripped_state_aliases(room, &aliases) - .await - } - AnyStrippedStateEvent::RoomAvatar(avatar) => { - event_emitter.on_stripped_state_avatar(room, &avatar).await - } - AnyStrippedStateEvent::RoomPowerLevels(power) => { - event_emitter - .on_stripped_state_power_levels(room, &power) - .await - } - AnyStrippedStateEvent::RoomJoinRules(rules) => { - event_emitter - .on_stripped_state_join_rules(room, &rules) - .await - } - _ => {} - } - } - - pub(crate) async fn emit_account_data_event( - &self, - room_id: &RoomId, - event: &AnyBasicEvent, - room_state: RoomStateType, - ) { - let lock = self.event_emitter.read().await; - let event_emitter = if let Some(ee) = lock.as_ref() { - ee - } else { - return; - }; - - let room = match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id).await { - RoomState::Invited(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id).await { - RoomState::Joined(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id).await { - RoomState::Left(Arc::clone(&room)) - } else { - return; - } - } - }; - - match event { - AnyBasicEvent::Presence(presence) => { - event_emitter.on_non_room_presence(room, &presence).await - } - AnyBasicEvent::IgnoredUserList(ignored) => { - event_emitter - .on_non_room_ignored_users(room, &ignored) - .await - } - AnyBasicEvent::PushRules(rules) => { - event_emitter.on_non_room_push_rules(room, &rules).await - } - _ => {} - } - } - - pub(crate) async fn emit_ephemeral_event( - &self, - room_id: &RoomId, - event: &AnySyncEphemeralRoomEvent, - room_state: RoomStateType, - ) { - let lock = self.event_emitter.read().await; - let event_emitter = if let Some(ee) = lock.as_ref() { - ee - } else { - return; - }; - - let room = match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id).await { - RoomState::Invited(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id).await { - RoomState::Joined(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id).await { - RoomState::Left(Arc::clone(&room)) - } else { - return; - } - } - }; - - match event { - AnySyncEphemeralRoomEvent::FullyRead(full_read) => { - event_emitter.on_non_room_fully_read(room, full_read).await - } - AnySyncEphemeralRoomEvent::Typing(typing) => { - event_emitter.on_non_room_typing(room, typing).await - } - AnySyncEphemeralRoomEvent::Receipt(receipt) => { - event_emitter.on_non_room_receipt(room, receipt).await - } - _ => {} - } - } - - pub(crate) async fn emit_presence_event( - &self, - room_id: &RoomId, - event: &PresenceEvent, - room_state: RoomStateType, - ) { - let room = match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id).await { - RoomState::Invited(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id).await { - RoomState::Joined(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id).await { - RoomState::Left(Arc::clone(&room)) - } else { - return; - } - } - }; - if let Some(ee) = &self.event_emitter.read().await.as_ref() { - ee.on_presence_event(room, &event).await; - } - } - - pub(crate) async fn emit_unrecognized_event( - &self, - room_id: &RoomId, - event: &Raw, - room_state: RoomStateType, - ) { - let room = match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id).await { - RoomState::Invited(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id).await { - RoomState::Joined(Arc::clone(&room)) - } else { - return; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id).await { - RoomState::Left(Arc::clone(&room)) - } else { - return; - } - } - }; - if let Some(ee) = &self.event_emitter.read().await.as_ref() { - ee.on_unrecognized_event(room, event.json()).await; - } - } - /// Get a `Sas` verification object with the given flow id. /// /// # Arguments @@ -1953,641 +691,4 @@ impl BaseClient { } #[cfg(test)] -mod test { - use serde_json::json; - use std::convert::TryFrom; - - #[cfg(feature = "messages")] - use crate::{ - events::AnySyncRoomEvent, identifiers::event_id, BaseClientConfig, JsonStore, Raw, - }; - use crate::{BaseClient, Session}; - - use matrix_sdk_common::identifiers::{room_id, user_id, RoomId}; - - use matrix_sdk_common_macros::async_trait; - use matrix_sdk_test::{async_test, test_json, EventBuilder, EventsJson}; - #[cfg(not(target_arch = "wasm32"))] - use tempfile::tempdir; - - #[cfg(target_arch = "wasm32")] - use wasm_bindgen_test::*; - - async fn get_client() -> BaseClient { - let session = Session { - access_token: "1234".to_owned(), - user_id: user_id!("@example:localhost"), - device_id: "DEVICEID".into(), - }; - let client = BaseClient::new().unwrap(); - client.restore_login(session).await.unwrap(); - client - } - - fn get_room_id() -> RoomId { - room_id!("!SVkFJHzfwvuaIEawgC:localhost") - } - - fn member_event() -> serde_json::Value { - json!({ - "content": { - "displayname": "example", - "membership": "join" - }, - "event_id": "$151800140517rfvjc:localhost", - "membership": "join", - "origin_server_ts": 0, - "sender": "@example:localhost", - "state_key": "@example:localhost", - "type": "m.room.member" - }) - } - - #[async_test] - async fn test_joined_room_creation() { - let mut sync_response = EventBuilder::default() - .add_state_event(EventsJson::Member) - .build_sync_response(); - let client = get_client().await; - let room_id = get_room_id(); - - let room = client.get_joined_room(&room_id).await; - assert!(room.is_none()); - - client - .receive_sync_response(&mut sync_response) - .await - .unwrap(); - - let room = client.get_left_room(&room_id).await; - assert!(room.is_none()); - - let room = client.get_joined_room(&room_id).await; - assert!(room.is_some()); - - let mut sync_response = EventBuilder::default() - .add_custom_left_event(&room_id, member_event()) - .build_sync_response(); - - sync_response.next_batch = "Hello".to_owned(); - - client - .receive_sync_response(&mut sync_response) - .await - .unwrap(); - - let room = client.get_joined_room(&room_id).await; - assert!(room.is_none()); - - let room = client.get_left_room(&room_id).await; - assert!(room.is_some()); - } - - #[async_test] - async fn test_left_room_creation() { - let room_id = room_id!("!left_room:localhost"); - let mut sync_response = EventBuilder::default() - .add_custom_left_event(&room_id, member_event()) - .build_sync_response(); - - let client = get_client().await; - - let room = client.get_left_room(&room_id).await; - assert!(room.is_none()); - - client - .receive_sync_response(&mut sync_response) - .await - .unwrap(); - - let room = client.get_left_room(&room_id).await; - assert!(room.is_some()); - - let mem = member_event(); - - let mut sync_response = EventBuilder::default() - .add_custom_joined_event(&room_id, mem) - .build_sync_response(); - - sync_response.next_batch = "Hello".to_owned(); - - client - .receive_sync_response(&mut sync_response) - .await - .unwrap(); - - let room = client.get_left_room(&room_id).await; - assert!(room.is_none()); - - let room = client.get_joined_room(&room_id).await; - assert!(room.is_some()); - } - - #[async_test] - async fn test_invited_room_creation() { - let room_id = room_id!("!invited_room:localhost"); - let mut sync_response = EventBuilder::default() - .add_custom_invited_event(&room_id, member_event()) - .build_sync_response(); - - let client = get_client().await; - - let room = client.get_invited_room(&room_id).await; - assert!(room.is_none()); - - client - .receive_sync_response(&mut sync_response) - .await - .unwrap(); - - let room = client.get_invited_room(&room_id).await; - assert!(room.is_some()); - - let mut sync_response = EventBuilder::default() - .add_custom_joined_event(&room_id, member_event()) - .build_sync_response(); - - sync_response.next_batch = "Hello".to_owned(); - - client - .receive_sync_response(&mut sync_response) - .await - .unwrap(); - - let room = client.get_invited_room(&room_id).await; - assert!(room.is_none()); - - let room = client.get_joined_room(&room_id).await; - assert!(room.is_some()); - } - - #[async_test] - async fn test_prev_content_from_unsigned() { - use super::*; - - use crate::{EventEmitter, SyncRoom}; - use matrix_sdk_common::{ - events::{ - room::member::{MemberEventContent, MembershipChange}, - SyncStateEvent, - }, - locks::RwLock, - }; - use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }; - - struct EE(Arc); - #[async_trait] - impl EventEmitter for EE { - async fn on_room_member( - &self, - room: SyncRoom, - event: &SyncStateEvent, - ) { - if let SyncRoom::Joined(_) = room { - if let MembershipChange::Joined = event.membership_change() { - self.0.swap(true, Ordering::SeqCst); - } - } - if event.prev_content.is_none() { - self.0.swap(false, Ordering::SeqCst); - } - } - } - - let room_id = get_room_id(); - let user_id = user_id!("@example:localhost"); - - let passed = Arc::new(AtomicBool::default()); - let emitter = EE(Arc::clone(&passed)); - let mut client = get_client().await; - - client.event_emitter = Arc::new(RwLock::new(Some(Box::new(emitter)))); - - // We can't do this through `EventBuilder` since it goes through a de/ser cycle and the - // `prev_content` is lost. Luckily, this test won't be needed once ruma fixes - // `prev_content` parsing. - let join_event: serde_json::Value = serde_json::json!({ - "content": { - "avatar_url": null, - "displayname": "example", - "membership": "join" - }, - "event_id": "$151800140517rfvjc:localhost", - "membership": "join", - "origin_server_ts": 151800140, - "sender": user_id.as_ref(), - "state_key": user_id.as_ref(), - "type": "m.room.member", - "unsigned": { - "age": 297036, - "replaces_state": "$151800111315tsynI:localhost", - "prev_content": { - "avatar_url": null, - "displayname": "example", - "membership": "invite" - } - } - }); - - let display_name_change_event: serde_json::Value = serde_json::json!({ - "content": { - "avatar_url": null, - "displayname": "changed", - "membership": "join" - }, - "event_id": "$191804320221Tallh:localhost", - "membership": "join", - "origin_server_ts": 151800140, - "sender": user_id.as_ref(), - "state_key": user_id.as_ref(), - "type": "m.room.member", - "unsigned": { - "age": 297036, - "replaces_state": "$151800140517rfvjc:localhost", - "prev_content": { - "avatar_url": null, - "displayname": "example", - "membership": "join" - } - } - }); - - let mut joined_rooms: HashMap = HashMap::new(); - let joined_room = serde_json::json!({ - "summary": {}, - "account_data": { - "events": [], - }, - "ephemeral": { - "events": [], - }, - "state": { - "events": [], - }, - "timeline": { - "events": vec![ join_event, display_name_change_event ], - "limited": true, - "prev_batch": "t392-516_47314_0_7_1_1_1_11444_1" - }, - "unread_notifications": { - "highlight_count": 0, - "notification_count": 11 - } - }); - joined_rooms.insert(room_id.clone(), joined_room); - - let empty_room: HashMap = HashMap::new(); - let body = serde_json::json!({ - "device_one_time_keys_count": {}, - "next_batch": "s526_47314_0_7_1_1_1_11444_1", - "device_lists": { - "changed": [], - "left": [] - }, - "rooms": { - "invite": empty_room, - "join": joined_rooms, - "leave": empty_room, - }, - "to_device": { - "events": [] - }, - "presence": { - "events": [] - } - }); - let response = http::Response::builder() - .body(serde_json::to_vec(&body).unwrap()) - .unwrap(); - let mut sync = - matrix_sdk_common::api::r0::sync::sync_events::Response::try_from(response).unwrap(); - - client.receive_sync_response(&mut sync).await.unwrap(); - - // This is a tricky test. Since we receive and emit the event separately, we have to test - // both paths. - - // This first part tests that the event was received correctly (with - // `prev_content` hoisted). - // - // However, we can't simply test that the member is joined since a missing `prev_content` - // is considered to be `"membership": "invite"` by default, which would still work out - // correctly. Hence we test that his display name was changed. - let room = client.get_joined_room(&room_id).await.unwrap(); - let room = room.read().await; - let member = room.joined_members.get(&user_id).unwrap(); - assert_eq!(*member.display_name.as_ref().unwrap(), "changed"); - - // The second part tests that the event is emitted correctly. If `prev_content` were - // missing, this bool would had been flipped. - assert!(passed.load(Ordering::SeqCst)) - } - - #[async_test] - async fn test_unrecognized_events() { - use super::*; - - use crate::{EventEmitter, SyncRoom}; - use matrix_sdk_common::{events::EventContent, locks::RwLock}; - use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }; - - struct EE(Arc); - #[async_trait] - impl EventEmitter for EE { - async fn on_custom_event(&self, room: SyncRoom, event: &CustomEvent<'_>) { - if let SyncRoom::Joined(_) = room { - if let CustomEvent::Message(event) = event { - if event.content.event_type() == "m.room.not_real" { - self.0.swap(true, Ordering::SeqCst); - } - } - } - } - } - - let room_id = get_room_id(); - let passed = Arc::new(AtomicBool::default()); - let emitter = EE(Arc::clone(&passed)); - let mut client = get_client().await; - - client.event_emitter = Arc::new(RwLock::new(Some(Box::new(emitter)))); - - // This is needed other wise the `EventBuilder` goes through a de/ser cycle and the `prev_content` is lost. - let event = json!({ - "content": { - "whatever": "you want" - }, - "event_id": "$eventid:foo", - "origin_server_ts": 159026265, - "sender": "@alice:matrix.org", - "type": "m.room.not_real", - "unsigned": { - "age": 85 - } - }); - - let mut joined_rooms: HashMap = HashMap::new(); - let joined_room = serde_json::json!({ - "summary": {}, - "account_data": { - "events": [], - }, - "ephemeral": { - "events": [], - }, - "state": { - "events": [], - }, - "timeline": { - "events": vec![ event ], - "limited": true, - "prev_batch": "t392-516_47314_0_7_1_1_1_11444_1" - }, - "unread_notifications": { - "highlight_count": 0, - "notification_count": 11 - } - }); - joined_rooms.insert(room_id, joined_room); - - let empty_room: HashMap = HashMap::new(); - let body = serde_json::json!({ - "device_one_time_keys_count": {}, - "next_batch": "s526_47314_0_7_1_1_1_11444_1", - "device_lists": { - "changed": [], - "left": [] - }, - "rooms": { - "invite": empty_room, - "join": joined_rooms, - "leave": empty_room, - }, - "to_device": { - "events": [] - }, - "presence": { - "events": [] - } - }); - let response = http::Response::builder() - .body(serde_json::to_vec(&body).unwrap()) - .unwrap(); - let mut sync = - matrix_sdk_common::api::r0::sync::sync_events::Response::try_from(response).unwrap(); - - client.receive_sync_response(&mut sync).await.unwrap(); - - assert!(passed.load(Ordering::SeqCst)) - } - - #[async_test] - async fn test_unrecognized_custom_event() { - use super::*; - - use crate::{EventEmitter, SyncRoom}; - use matrix_sdk_common::{api::r0::sync::sync_events, locks::RwLock}; - use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }; - - struct EE(Arc); - #[async_trait] - impl EventEmitter for EE { - async fn on_custom_event(&self, room: SyncRoom, event: &CustomEvent<'_>) { - if let SyncRoom::Joined(_) = room { - if let CustomEvent::Message(custom) = event { - if custom.content.event_type == "m.reaction" - && custom.content.json.get("m.relates_to").is_some() - { - self.0.swap(true, Ordering::SeqCst); - } - } - } - } - } - - let room_id = get_room_id(); - let passed = Arc::new(AtomicBool::default()); - let emitter = EE(Arc::clone(&passed)); - let mut client = get_client().await; - - client.event_emitter = Arc::new(RwLock::new(Some(Box::new(emitter)))); - - // This is needed other wise the `EventBuilder` goes through a de/ser cycle and the `prev_content` is lost. - let event: &serde_json::Value = &test_json::REACTION; - - let mut joined_rooms: HashMap = HashMap::new(); - let joined_room = serde_json::json!({ - "summary": {}, - "account_data": { - "events": [], - }, - "ephemeral": { - "events": [], - }, - "state": { - "events": [], - }, - "timeline": { - "events": vec![ event ], - "limited": true, - "prev_batch": "t392-516_47314_0_7_1_1_1_11444_1" - }, - "unread_notifications": { - "highlight_count": 0, - "notification_count": 11 - } - }); - joined_rooms.insert(room_id, joined_room); - - let empty_room: HashMap = HashMap::new(); - let body = serde_json::json!({ - "device_one_time_keys_count": {}, - "next_batch": "s526_47314_0_7_1_1_1_11444_1", - "device_lists": { - "changed": [], - "left": [] - }, - "rooms": { - "invite": empty_room, - "join": joined_rooms, - "leave": empty_room, - }, - "to_device": { - "events": [] - }, - "presence": { - "events": [] - } - }); - let response = http::Response::builder() - .body(serde_json::to_vec(&body).unwrap()) - .unwrap(); - let mut sync = sync_events::Response::try_from(response).unwrap(); - - client.receive_sync_response(&mut sync).await.unwrap(); - - assert!(passed.load(Ordering::SeqCst)) - } - - #[cfg(feature = "messages")] - #[async_test] - async fn message_queue_redaction_event_store_deser() { - use std::ops::Deref; - - let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); - - let session = Session { - access_token: "1234".to_owned(), - user_id: user_id!("@cheeky_monkey:matrix.org"), - device_id: "DEVICEID".into(), - }; - - let _m = mockito::mock( - "GET", - mockito::Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), - ) - .with_status(200) - .with_body(test_json::SYNC.to_string()) - .create(); - - let dir = tempdir().unwrap(); - // a sync response to populate our JSON store - let config = - BaseClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); - let client = BaseClient::new_with_config(config).unwrap(); - client.restore_login(session.clone()).await.unwrap(); - - let response = http::Response::builder() - .body(serde_json::to_vec(test_json::SYNC.deref()).unwrap()) - .unwrap(); - let mut sync = - matrix_sdk_common::api::r0::sync::sync_events::Response::try_from(response).unwrap(); - - client.receive_sync_response(&mut sync).await.unwrap(); - - let json = serde_json::json!({ - "content": { - "reason": "😀" - }, - "event_id": "$XXXX:localhost", - "origin_server_ts": 151957878, - "sender": "@example:localhost", - "type": "m.room.redaction", - "redacts": "$152037280074GZeOm:localhost" - }); - let mut event: Raw = serde_json::from_value(json).unwrap(); - client - .receive_joined_timeline_event(&room_id, &mut event) - .await - .unwrap(); - - // check that the message has actually been redacted - for room in client.joined_rooms().read().await.values() { - let queue = &room.read().await.messages; - if let crate::events::AnyPossiblyRedactedSyncMessageEvent::Redacted( - crate::events::AnyRedactedSyncMessageEvent::RoomMessage(event), - ) = &queue.msgs[0] - { - // this is the id from the message event in the sync response - assert_eq!(event.event_id, event_id!("$152037280074GZeOm:localhost")) - } else { - panic!("message event in message queue should be redacted") - } - } - - // `receive_joined_timeline_event` does not save the state to the store - // so we must do it ourselves - client.store_room_state(&room_id).await.unwrap(); - - // we load state from the store only - let config = - BaseClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); - let client = BaseClient::new_with_config(config).unwrap(); - client.restore_login(session).await.unwrap(); - - // make sure that our redacted message event is redacted and that ser/de works - // properly - for room in client.joined_rooms().read().await.values() { - let queue = &room.read().await.messages; - if let crate::events::AnyPossiblyRedactedSyncMessageEvent::Redacted( - crate::events::AnyRedactedSyncMessageEvent::RoomMessage(event), - ) = &queue.msgs[0] - { - // this is the id from the message event in the sync response - assert_eq!(event.event_id, event_id!("$152037280074GZeOm:localhost")) - } else { - panic!("[post store sync] message event in message queue should be redacted") - } - } - } - - #[async_test] - #[cfg(feature = "encryption")] - async fn test_group_session_invalidation() { - let client = get_client().await; - let room_id = get_room_id(); - - let mut sync_response = EventBuilder::default() - .add_state_event(EventsJson::Member) - .build_sync_response(); - - client - .receive_sync_response(&mut sync_response) - .await - .unwrap(); - - assert!(client.should_share_group_session(&room_id).await); - let _ = client.share_group_session(&room_id).await.unwrap(); - assert!(!client.should_share_group_session(&room_id).await); - client.invalidate_group_session(&room_id).await; - } -} +mod test {} diff --git a/matrix_sdk_base/src/lib.rs b/matrix_sdk_base/src/lib.rs index 6f4377b4..0efe5ded 100644 --- a/matrix_sdk_base/src/lib.rs +++ b/matrix_sdk_base/src/lib.rs @@ -28,7 +28,6 @@ //! of Synapse in compliance with the Matrix API specification. #![deny( missing_debug_implementations, - dead_code, missing_docs, trivial_casts, trivial_numeric_casts, @@ -46,24 +45,10 @@ pub use matrix_sdk_common::*; mod client; mod error; -mod event_emitter; -mod models; mod session; -mod state; pub use client::{BaseClient, BaseClientConfig, RoomState, RoomStateType}; -pub use event_emitter::{CustomEvent, EventEmitter, SyncRoom}; -pub use models::{Room, RoomMember}; -pub use state::{AllRooms, ClientState}; #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub use matrix_sdk_crypto as crypto; - -#[cfg(feature = "messages")] -#[cfg_attr(feature = "docs", doc(cfg(messages)))] -pub use models::{MessageQueue, PossiblyRedactedExt}; - -#[cfg(not(target_arch = "wasm32"))] -pub use state::JsonStore; -pub use state::StateStore; diff --git a/matrix_sdk_base/src/state/json_store.rs b/matrix_sdk_base/src/state/json_store.rs index 1eea500b..2556e96e 100644 --- a/matrix_sdk_base/src/state/json_store.rs +++ b/matrix_sdk_base/src/state/json_store.rs @@ -214,190 +214,4 @@ impl StateStore for JsonStore { } #[cfg(test)] -mod test { - use super::*; - - use std::path::PathBuf; - - use tempfile::tempdir; - - use crate::{ - identifiers::{room_id, user_id}, - push::Ruleset, - BaseClient, BaseClientConfig, Session, - }; - - use matrix_sdk_test::{sync_response, SyncResponseFile}; - - #[tokio::test] - async fn test_store_client_state() { - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - - let user = user_id!("@example:example.com"); - - let sess = Session { - access_token: "32nj9zu034btz90".to_string(), - user_id: user.clone(), - device_id: "Tester".into(), - }; - - let state = ClientState { - sync_token: Some("hello".into()), - ignored_users: vec![user], - push_ruleset: None::, - }; - - let mut path_with_user = PathBuf::from(path); - path_with_user.push(sess.user_id.localpart()); - // we have to set the path since `JsonStore::store_client_state()` doesn't append to the path - let store = JsonStore::open(path_with_user).unwrap(); - store.store_client_state(state.clone()).await.unwrap(); - - // the newly loaded store sets it own user_id local part when `load_client_state` - let store = JsonStore::open(path).unwrap(); - let loaded = store.load_client_state(&sess).await.unwrap(); - assert_eq!(loaded, Some(state)); - } - - #[tokio::test] - async fn test_store_load_joined_room_state() { - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - let store = JsonStore::open(path).unwrap(); - - let id = room_id!("!roomid:example.com"); - let user = user_id!("@example:example.com"); - - let room = Room::new(&id, &user); - store - .store_room_state(RoomState::Joined(&room)) - .await - .unwrap(); - let AllRooms { joined, .. } = store.load_all_rooms().await.unwrap(); - assert_eq!(joined.get(&id), Some(&Room::new(&id, &user))); - } - - #[tokio::test] - async fn test_store_load_left_room_state() { - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - let store = JsonStore::open(path).unwrap(); - - let id = room_id!("!roomid:example.com"); - let user = user_id!("@example:example.com"); - - let room = Room::new(&id, &user); - store - .store_room_state(RoomState::Left(&room)) - .await - .unwrap(); - let AllRooms { left, .. } = store.load_all_rooms().await.unwrap(); - assert_eq!(left.get(&id), Some(&Room::new(&id, &user))); - } - - #[tokio::test] - async fn test_store_load_invited_room_state() { - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - let store = JsonStore::open(path).unwrap(); - - let id = room_id!("!roomid:example.com"); - let user = user_id!("@example:example.com"); - - let room = Room::new(&id, &user); - store - .store_room_state(RoomState::Invited(&room)) - .await - .unwrap(); - let AllRooms { invited, .. } = store.load_all_rooms().await.unwrap(); - assert_eq!(invited.get(&id), Some(&Room::new(&id, &user))); - } - - #[tokio::test] - async fn test_store_load_join_leave_room_state() { - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - let store = JsonStore::open(path).unwrap(); - - let id = room_id!("!roomid:example.com"); - let user = user_id!("@example:example.com"); - - let room = Room::new(&id, &user); - store - .store_room_state(RoomState::Joined(&room)) - .await - .unwrap(); - assert!(store - .delete_room_state(RoomState::Joined(&id)) - .await - .is_ok()); - let AllRooms { joined, .. } = store.load_all_rooms().await.unwrap(); - - // test that we have removed the correct room - assert!(joined.is_empty()); - } - - #[tokio::test] - async fn test_store_load_invite_join_room_state() { - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - let store = JsonStore::open(path).unwrap(); - - let id = room_id!("!roomid:example.com"); - let user = user_id!("@example:example.com"); - - let room = Room::new(&id, &user); - store - .store_room_state(RoomState::Invited(&room)) - .await - .unwrap(); - assert!(store - .delete_room_state(RoomState::Invited(&id)) - .await - .is_ok()); - let AllRooms { invited, .. } = store.load_all_rooms().await.unwrap(); - // test that we have removed the correct room - assert!(invited.is_empty()); - } - - #[tokio::test] - async fn test_client_sync_store() { - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - - let session = Session { - access_token: "1234".to_owned(), - user_id: user_id!("@cheeky_monkey:matrix.org"), - device_id: "DEVICEID".into(), - }; - - // a sync response to populate our JSON store - let store = Box::new(JsonStore::open(path).unwrap()); - let client = - BaseClient::new_with_config(BaseClientConfig::new().state_store(store)).unwrap(); - client.restore_login(session.clone()).await.unwrap(); - - let mut response = sync_response(SyncResponseFile::Default); - - // gather state to save to the db, the first time through loading will be skipped - client.receive_sync_response(&mut response).await.unwrap(); - - // now syncing the client will update from the state store - let store = Box::new(JsonStore::open(path).unwrap()); - let client = - BaseClient::new_with_config(BaseClientConfig::new().state_store(store)).unwrap(); - client.restore_login(session.clone()).await.unwrap(); - - // assert the synced client and the logged in client are equal - assert_eq!(*client.session().read().await, Some(session)); - assert_eq!( - client.sync_token().await, - Some("s526_47314_0_7_1_1_1_11444_1".to_string()) - ); - assert_eq!( - *client.ignored_users.read().await, - vec![user_id!("@someone:example.org")] - ); - } -} +mod test {} diff --git a/matrix_sdk_base/src/state/mod.rs b/matrix_sdk_base/src/state/mod.rs index ffd53d58..de724220 100644 --- a/matrix_sdk_base/src/state/mod.rs +++ b/matrix_sdk_base/src/state/mod.rs @@ -56,27 +56,6 @@ impl PartialEq for ClientState { } } -impl ClientState { - /// Create a JSON serialize-able `ClientState`. - /// - /// This enables non sensitive information to be saved by `JsonStore`. - #[allow(clippy::eval_order_dependence)] - // TODO is this ok ^^^?? https://github.com/rust-lang/rust-clippy/issues/4637 - pub async fn from_base_client(client: &BaseClient) -> ClientState { - let BaseClient { - sync_token, - ignored_users, - push_ruleset, - .. - } = client; - Self { - sync_token: sync_token.read().await.clone(), - ignored_users: ignored_users.read().await.clone(), - push_ruleset: push_ruleset.read().await.clone(), - } - } -} - /// `JsonStore::load_all_rooms` returns `AllRooms`. /// /// `AllRooms` is made of the `joined`, `invited` and `left` room maps.