diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index a21cde7c..36e08cfa 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -32,8 +32,6 @@ matrix-sdk-crypto = { path = "../matrix_sdk_crypto", optional = true } # Misc dependencies thiserror = "1.0.16" tracing = "0.1.13" -atomic = "0.4.5" -dashmap = "3.11.1" [dependencies.tracing-futures] version = "0.2.4" @@ -45,12 +43,6 @@ version = "0.2.20" default-features = false features = ["sync", "time", "fs"] -[dependencies.sqlx] -version = "0.3.4" -optional = true -default-features = false -features = ["runtime-tokio", "sqlite"] - [dev-dependencies] tokio = { version = "0.2.20", features = ["rt-threaded", "macros"] } ruma-identifiers = { version = "0.16.1", features = ["rand"] } diff --git a/matrix_sdk/src/async_client.rs b/matrix_sdk/src/async_client.rs index dde9d8ba..1c99ec58 100644 --- a/matrix_sdk/src/async_client.rs +++ b/matrix_sdk/src/async_client.rs @@ -17,7 +17,6 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; -use std::ops::Deref; use std::result::Result as StdResult; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -50,21 +49,21 @@ use crate::models::Room; use crate::session::Session; use crate::state::StateStore; use crate::VERSION; -use crate::{Error, EventEmitter, Result, RoomStateType}; +use crate::{Error, EventEmitter, Result}; const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); -#[derive(Clone)] /// An async/await enabled Matrix client. /// /// All of the state is held in an `Arc` so the `AsyncClient` can be cloned freely. +#[derive(Clone)] pub struct AsyncClient { /// The URL of the homeserver to connect to. homeserver: Url, /// The underlying HTTP client. http_client: reqwest::Client, /// User session data. - pub(crate) base_client: Arc>, + pub(crate) base_client: BaseClient, } impl std::fmt::Debug for AsyncClient { @@ -286,24 +285,22 @@ impl AsyncClient { let http_client = http_client.default_headers(headers).build()?; - let mut base_client = BaseClient::new(session)?; - - if let Some(store) = config.state_store { - base_client.state_store = Some(store); + let base_client = if let Some(store) = config.state_store { + BaseClient::new_with_state_store(session, store)? + } else { + BaseClient::new(session)? }; Ok(Self { homeserver, http_client, - base_client: Arc::new(RwLock::new(base_client)), + base_client, }) } /// Is the client logged in. pub async fn logged_in(&self) -> bool { - // TODO turn this into a atomic bool so this method doesn't need to be - // async. - self.base_client.read().await.logged_in() + self.base_client.logged_in().await } /// The Homeserver of the client. @@ -315,7 +312,7 @@ impl AsyncClient { /// /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. pub async fn add_event_emitter(&mut self, emitter: Box) { - self.base_client.write().await.event_emitter = Some(emitter); + self.base_client.add_event_emitter(emitter).await; } /// Returns an `Option` of the room name from a `RoomId`. @@ -323,41 +320,67 @@ impl AsyncClient { /// This is a human readable room name. pub async fn get_room_name(&self, room_id: &RoomId) -> Option { // TODO do we want to use the `RoomStateType` enum here or should we have - // 3 seperate `room_name` methods. The other option is to remove this and have + // 3 separate `room_name` methods. The other option is to remove this and have // the user get a `Room` and use `Room::calculate_name` method? - self.base_client - .read() - .await - .calculate_room_name(room_id) - .await + self.base_client.calculate_room_name(room_id).await } /// Returns a `Vec` of the room names this client knows about. /// /// This is a human readable list of room names. pub async fn get_room_names(&self) -> Vec { - self.base_client.read().await.calculate_room_names().await + // TODO same as get_room_name + self.base_client.calculate_room_names().await } /// Returns the joined rooms this client knows about. /// /// A `HashMap` of room id to `matrix::models::Room` - pub async fn get_joined_rooms(&self) -> HashMap>> { - self.base_client.read().await.joined_rooms.clone() + pub fn joined_rooms(&self) -> Arc>>>> { + self.base_client.joined_rooms() } /// Returns the invited rooms this client knows about. /// /// A `HashMap` of room id to `matrix::models::Room` - pub async fn get_invited_rooms(&self) -> HashMap>> { - self.base_client.read().await.invited_rooms.clone() + pub async fn invited_rooms( + &self, + ) -> Arc>>>> { + self.base_client.invited_rooms() } /// Returns the left rooms this client knows about. /// /// A `HashMap` of room id to `matrix::models::Room` - pub async fn get_left_rooms(&self) -> HashMap>> { - self.base_client.read().await.lefted_rooms.clone() + pub async fn left_rooms(&self) -> Arc>>>> { + self.base_client.left_rooms() + } + + /// Get a joined room with the given room id. + /// + /// # Arguments + /// + /// `room_id` - The unique id of the room that should be fetched. + pub async fn get_joined_room(&self, room_id: &RoomId) -> Option>> { + self.base_client.get_joined_room(room_id).await + } + + /// Get an invited room with the given room id. + /// + /// # Arguments + /// + /// `room_id` - The unique id of the room that should be fetched. + pub async fn get_invited_room(&self, room_id: &RoomId) -> Option>> { + self.base_client.get_invited_room(room_id).await + } + + /// Get a left room with the given room id. + /// + /// # Arguments + /// + /// `room_id` - The unique id of the room that should be fetched. + pub async fn get_left_room(&self, room_id: &RoomId) -> Option>> { + self.base_client.get_left_room(room_id).await } /// This allows `AsyncClient` to manually sync state with the provided `StateStore`. @@ -384,7 +407,7 @@ impl AsyncClient { /// # }); /// ``` pub async fn sync_with_state_store(&self) -> Result { - self.base_client.write().await.sync_with_state_store().await + self.base_client.sync_with_state_store().await } /// Login to the server. @@ -419,9 +442,7 @@ impl AsyncClient { }; let response = self.send(request).await?; - let mut client = self.base_client.write().await; - - client.receive_login_response(&response).await?; + self.base_client.receive_login_response(&response).await?; Ok(response) } @@ -642,7 +663,7 @@ impl AsyncClient { pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result { { // if the client has been synced from the state store don't sync again - if !self.base_client.read().await.is_state_store_synced() { + if !self.base_client.is_state_store_synced() { // this will bail out returning false if the store has not been set up if let Ok(synced) = self.sync_with_state_store().await { if synced { @@ -663,236 +684,13 @@ impl AsyncClient { let mut response = self.send(request).await?; - // when events change state updated signals to state store to update database - let mut updated = self.iter_joined_rooms(&mut response).await?; - - if self.iter_invited_rooms(&response).await? { - updated = true; - } - - if self.iter_left_rooms(&mut response).await? { - updated = true; - } - - let mut client = self.base_client.write().await; - client.receive_sync_response(&mut response, updated).await?; + self.base_client + .receive_sync_response(&mut response) + .await?; Ok(response) } - async fn iter_joined_rooms(&self, response: &mut sync_events::Response) -> Result { - let mut updated = false; - for (room_id, joined_room) in &mut response.rooms.join { - let matrix_room = { - let mut client = self.base_client.write().await; - for event in &joined_room.state.events { - if let Ok(e) = event.deserialize() { - if client.receive_joined_state_event(&room_id, &e).await { - updated = true; - } - } - } - - client.get_or_create_joined_room(&room_id).clone() - }; - - // RoomSummary contains information for calculating room name - matrix_room - .write() - .await - .set_room_summary(&joined_room.summary); - - // re looping is not ideal here - for event in &mut joined_room.state.events { - if let Ok(e) = event.deserialize() { - let client = self.base_client.read().await; - client - .emit_state_event(&room_id, &e, RoomStateType::Joined) - .await; - } - } - - for mut event in &mut joined_room.timeline.events { - let decrypted_event = { - let mut client = self.base_client.write().await; - let (decrypt_ev, timeline_update) = client - .receive_joined_timeline_event(room_id, &mut event) - .await; - if timeline_update { - updated = true; - }; - decrypt_ev - }; - - if let Some(e) = decrypted_event { - *event = e; - } - - if let Ok(e) = event.deserialize() { - let client = self.base_client.read().await; - client - .emit_timeline_event(&room_id, &e, RoomStateType::Joined) - .await; - } - } - - // look at AccountData to further cut down users by collecting ignored users - if let Some(account_data) = &joined_room.account_data { - for account_data in &account_data.events { - { - if let Ok(e) = account_data.deserialize() { - let mut client = self.base_client.write().await; - if client.receive_account_data_event(&room_id, &e).await { - updated = true; - } - client - .emit_account_data_event(room_id, &e, RoomStateType::Joined) - .await; - } - } - } - } - - // After the room has been created and state/timeline events accounted for we use the room_id of the newly created - // room to add any presence events that relate to a user in the current room. This is not super - // efficient but we need a room_id so we would loop through now or later. - for presence in &mut response.presence.events { - { - if let Ok(e) = presence.deserialize() { - let mut client = self.base_client.write().await; - if client.receive_presence_event(&room_id, &e).await { - updated = true; - } - - client - .emit_presence_event(&room_id, &e, RoomStateType::Joined) - .await; - } - } - } - - for ephemeral in &mut joined_room.ephemeral.events { - { - if let Ok(e) = ephemeral.deserialize() { - let mut client = self.base_client.write().await; - if client.receive_ephemeral_event(&room_id, &e).await { - updated = true; - } - - client - .emit_ephemeral_event(&room_id, &e, RoomStateType::Joined) - .await; - } - } - } - - if updated { - if let Some(store) = self.base_client.read().await.state_store.as_ref() { - store - .store_room_state(matrix_room.read().await.deref()) - .await?; - } - } - } - Ok(updated) - } - - async fn iter_left_rooms(&self, response: &mut sync_events::Response) -> Result { - let mut updated = false; - for (room_id, left_room) in &mut response.rooms.leave { - let matrix_room = { - let mut client = self.base_client.write().await; - for event in &left_room.state.events { - if let Ok(e) = event.deserialize() { - if client.receive_left_state_event(&room_id, &e).await { - updated = true; - } - } - } - - client.get_or_create_left_room(&room_id).clone() - }; - - for event in &mut left_room.state.events { - if let Ok(e) = event.deserialize() { - let client = self.base_client.read().await; - client - .emit_state_event(&room_id, &e, RoomStateType::Left) - .await; - } - } - - for mut event in &mut left_room.timeline.events { - let decrypted_event = { - let mut client = self.base_client.write().await; - let (decrypt_ev, timeline_update) = client - .receive_left_timeline_event(room_id, &mut event) - .await; - if timeline_update { - updated = true; - }; - decrypt_ev - }; - - if let Some(e) = decrypted_event { - *event = e; - } - - if let Ok(e) = event.deserialize() { - let client = self.base_client.read().await; - client - .emit_timeline_event(&room_id, &e, RoomStateType::Left) - .await; - } - } - - if updated { - if let Some(store) = self.base_client.read().await.state_store.as_ref() { - store - .store_room_state(matrix_room.read().await.deref()) - .await?; - } - } - } - Ok(updated) - } - - async fn iter_invited_rooms(&self, response: &sync_events::Response) -> Result { - let mut updated = false; - for (room_id, invited_room) in &response.rooms.invite { - let matrix_room = { - let mut client = self.base_client.write().await; - for event in &invited_room.invite_state.events { - if let Ok(e) = event.deserialize() { - if client.receive_invite_state_event(&room_id, &e).await { - updated = true; - } - } - } - - client.get_or_create_left_room(&room_id).clone() - }; - - for event in &invited_room.invite_state.events { - if let Ok(e) = event.deserialize() { - let client = self.base_client.read().await; - client - .emit_stripped_state_event(&room_id, &e, RoomStateType::Invited) - .await; - } - } - - if updated { - if let Some(store) = self.base_client.read().await.state_store.as_ref() { - store - .store_room_state(matrix_room.read().await.deref()) - .await?; - } - } - } - Ok(updated) - } - /// Repeatedly call sync to synchronize the client state with the server. /// /// # Arguments @@ -975,7 +773,7 @@ impl AsyncClient { #[cfg(feature = "encryption")] { - if self.base_client.read().await.should_upload_keys().await { + if self.base_client.should_upload_keys().await { let response = self.keys_upload().await; if let Err(e) = response { @@ -983,7 +781,7 @@ impl AsyncClient { } } - if self.base_client.read().await.should_query_keys().await { + if self.base_client.should_query_keys().await { let response = self.keys_query().await; if let Err(e) = response { @@ -1048,9 +846,9 @@ impl AsyncClient { }; let request_builder = if Request::METADATA.requires_authentication { - let client = self.base_client.read().await; + let session = self.base_client.session().read().await; - if let Some(ref session) = client.session { + if let Some(session) = session.as_ref() { request_builder.bearer_auth(&session.access_token) } else { return Err(Error::AuthenticationRequired); @@ -1134,8 +932,7 @@ impl AsyncClient { #[cfg(feature = "encryption")] { let encrypted = { - let client = self.base_client.read().await; - let room = client.joined_rooms.get(room_id); + let room = self.base_client.get_joined_room(room_id).await; match room { Some(r) => r.read().await.is_encrypted(), @@ -1145,40 +942,24 @@ impl AsyncClient { if encrypted { let missing_sessions = { - let client = self.base_client.read().await; - let room = client.joined_rooms.get(room_id); + let room = self.base_client.get_joined_room(room_id).await; let room = room.as_ref().unwrap().read().await; let users = room.members.keys(); - self.base_client - .read() - .await - .get_missing_sessions(users) - .await? + self.base_client.get_missing_sessions(users).await? }; if !missing_sessions.is_empty() { self.claim_one_time_keys(missing_sessions).await?; } - if self - .base_client - .read() - .await - .should_share_group_session(room_id) - .await - { + if self.base_client.should_share_group_session(room_id).await { // TODO we need to make sure that only one such request is // in flight per room at a time. self.share_group_session(room_id).await?; } raw_content = serde_json::value::to_raw_value( - &self - .base_client - .read() - .await - .encrypt(room_id, content) - .await?, + &self.base_client.encrypt(room_id, content).await?, )?; event_type = EventType::RoomEncrypted; } @@ -1219,8 +1000,6 @@ impl AsyncClient { let response = self.send(request).await?; self.base_client - .read() - .await .receive_keys_claim_response(&response) .await?; Ok(response) @@ -1242,8 +1021,6 @@ impl AsyncClient { async fn share_group_session(&self, room_id: &RoomId) -> Result<()> { let mut requests = self .base_client - .read() - .await .share_group_session(room_id) .await .expect("Keys don't need to be uploaded"); @@ -1270,8 +1047,6 @@ impl AsyncClient { async fn keys_upload(&self) -> Result { let (device_keys, one_time_keys) = self .base_client - .read() - .await .keys_for_upload() .await .expect("Keys don't need to be uploaded"); @@ -1289,8 +1064,6 @@ impl AsyncClient { let response = self.send(request).await?; self.base_client - .read() - .await .receive_keys_upload_response(&response) .await?; Ok(response) @@ -1299,7 +1072,7 @@ impl AsyncClient { /// Get the current, if any, sync token of the client. /// This will be None if the client didn't sync at least once. pub async fn sync_token(&self) -> Option { - self.base_client.read().await.sync_token.clone() + self.base_client.sync_token().await } /// Query the server for users device keys. @@ -1313,8 +1086,6 @@ impl AsyncClient { async fn keys_query(&self) -> Result { let mut users_for_query = self .base_client - .read() - .await .users_for_key_query() .await .expect("Keys don't need to be uploaded"); @@ -1338,8 +1109,6 @@ impl AsyncClient { let response = self.send(request).await?; self.base_client - .read() - .await .receive_keys_query_response(&response) .await?; diff --git a/matrix_sdk/src/base_client.rs b/matrix_sdk/src/base_client.rs index c6099580..3a7d5a01 100644 --- a/matrix_sdk/src/base_client.rs +++ b/matrix_sdk/src/base_client.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; #[cfg(feature = "encryption")] use std::collections::{BTreeMap, HashSet}; use std::fmt; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; #[cfg(feature = "encryption")] @@ -38,6 +39,7 @@ use crate::session::Session; use crate::state::{ClientState, StateStore}; use crate::EventEmitter; +use std::ops::Deref; #[cfg(feature = "encryption")] use tokio::sync::Mutex; use tokio::sync::RwLock; @@ -81,65 +83,37 @@ pub enum RoomState { Invited(Arc>), } -macro_rules! emit_event { - ($this:ident, $id:ident, $event:ident, $state:ident, $($variant:path => $meth:ident,)*) => { - match &$event { - $($variant(ev) => { - if let Some(ee) = &$this.event_emitter { - match $state { - RoomStateType::Joined => { - if let Some(room) = $this.get_joined_room(&$id) { - ee.$meth(RoomState::Joined(Arc::clone(&room)), &ev).await; - } - } - RoomStateType::Invited => { - if let Some(room) = $this.get_invited_room(&$id) { - ee.$meth(RoomState::Invited(Arc::clone(&room)), &ev).await; - } - } - RoomStateType::Left => { - if let Some(room) = $this.get_left_room(&$id) { - ee.$meth(RoomState::Left(Arc::clone(&room)), &ev).await; - } - } - } - } - })* - _ => {} - } - } -} - /// A no IO Client implementation. /// /// This Client is a state machine that receives responses and events and /// accordingly updates it's state. +#[derive(Clone)] pub struct Client { /// The current client session containing our user id, device id and access /// token. - pub session: Option, + session: Arc>>, /// The current sync token that should be used for the next sync call. - pub sync_token: Option, + pub(crate) sync_token: Arc>>, /// A map of the rooms our user is joined in. - pub joined_rooms: HashMap>>, + joined_rooms: Arc>>>>, /// A map of the rooms our user is invited to. - pub invited_rooms: HashMap>>, + invited_rooms: Arc>>>>, /// A map of the rooms our user has left. - pub lefted_rooms: HashMap>>, + left_rooms: Arc>>>>, /// A list of ignored users. - pub ignored_users: Vec, + pub(crate) ignored_users: Arc>>, /// The push ruleset for the logged in user. - pub push_ruleset: Option, + pub(crate) push_ruleset: Arc>>, /// Any implementor of EventEmitter will act as the callbacks for various /// events. - pub event_emitter: Option>, + event_emitter: Arc>>>, /// Any implementor of `StateStore` will be called to save `Room` and /// some `BaseClient` state during `AsyncClient::sync` calls. /// /// There is a default implementation `JsonStore` that saves JSON to disk. - pub state_store: Option>, + state_store: Arc>>>, /// Does the `Client` need to sync with the state store. - needs_state_store_sync: bool, + needs_state_store_sync: Arc, #[cfg(feature = "encryption")] olm: Arc>>, @@ -166,6 +140,26 @@ impl Client { /// * `session` - An optional session if the user already has one from a /// previous login call. pub fn new(session: Option) -> Result { + Client::new_helper(session, None) + } + + /// Create a new client. + /// + /// # Arguments + /// + /// * `session` - An optional session if the user already has one from a + /// previous login call. + /// + /// * `store` - An open state store implementation that will be used through + /// the lifetime of the client. + pub fn new_with_state_store( + session: Option, + store: Box, + ) -> Result { + Client::new_helper(session, Some(store)) + } + + fn new_helper(session: Option, store: Option>) -> Result { #[cfg(feature = "encryption")] let olm = match &session { Some(s) => Some(OlmMachine::new(&s.user_id, &s.device_id)), @@ -173,53 +167,62 @@ impl Client { }; Ok(Client { - session, - sync_token: None, - joined_rooms: HashMap::new(), - invited_rooms: HashMap::new(), - lefted_rooms: HashMap::new(), - ignored_users: Vec::new(), - push_ruleset: None, - event_emitter: None, - state_store: None, - needs_state_store_sync: true, + session: Arc::new(RwLock::new(session)), + sync_token: Arc::new(RwLock::new(None)), + joined_rooms: Arc::new(RwLock::new(HashMap::new())), + invited_rooms: Arc::new(RwLock::new(HashMap::new())), + left_rooms: Arc::new(RwLock::new(HashMap::new())), + ignored_users: Arc::new(RwLock::new(Vec::new())), + push_ruleset: Arc::new(RwLock::new(None)), + event_emitter: Arc::new(RwLock::new(None)), + state_store: Arc::new(RwLock::new(store)), + needs_state_store_sync: Arc::new(AtomicBool::from(true)), #[cfg(feature = "encryption")] olm: Arc::new(Mutex::new(olm)), }) } + /// The current client session containing our user id, device id and access + /// token. + pub fn session(&self) -> &Arc>> { + &self.session + } + /// Is the client logged in. - pub fn logged_in(&self) -> bool { - self.session.is_some() + pub async fn logged_in(&self) -> bool { + // TODO turn this into a atomic bool so this method doesn't need to be + // async. + self.session.read().await.is_some() } /// Add `EventEmitter` to `Client`. /// /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. - pub async fn add_event_emitter(&mut self, emitter: Box) { - self.event_emitter = Some(emitter); + pub async fn add_event_emitter(&self, emitter: Box) { + *self.event_emitter.write().await = Some(emitter); } /// Returns true if the state store has been loaded into the client. pub fn is_state_store_synced(&self) -> bool { - !self.needs_state_store_sync + !self.needs_state_store_sync.load(Ordering::Relaxed) } /// When a client is provided the state store will load state from the `StateStore`. /// /// Returns `true` when a state store sync has successfully completed. - pub(crate) async fn sync_with_state_store(&mut self) -> Result { - if let Some(store) = self.state_store.as_ref() { - if let Some(sess) = self.session.as_ref() { + pub(crate) async fn sync_with_state_store(&self) -> Result { + let store = self.state_store.read().await; + if let Some(store) = store.as_ref() { + if let Some(sess) = self.session.read().await.as_ref() { if let Some(client_state) = store.load_client_state(sess).await? { let ClientState { sync_token, ignored_users, push_ruleset, } = client_state; - self.sync_token = sync_token; - self.ignored_users = ignored_users; - self.push_ruleset = push_ruleset; + *self.sync_token.write().await = sync_token; + *self.ignored_users.write().await = ignored_users; + *self.push_ruleset.write().await = push_ruleset; } else { // return false and continues with a sync request then save the state and create // and populate the files during the sync @@ -227,15 +230,15 @@ impl Client { } let mut rooms = store.load_all_rooms().await?; - self.joined_rooms = rooms + *self.joined_rooms.write().await = rooms .drain() .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) .collect(); - self.needs_state_store_sync = false; + self.needs_state_store_sync.store(false, Ordering::Relaxed); } } - Ok(!self.needs_state_store_sync) + Ok(!self.needs_state_store_sync.load(Ordering::Relaxed)) } /// Receive a login response and update the session of the client. @@ -245,7 +248,7 @@ impl Client { /// * `response` - A successful login response that contains our access token /// and device id. pub async fn receive_login_response( - &mut self, + &self, response: &api::session::login::Response, ) -> Result<()> { let session = Session { @@ -253,7 +256,7 @@ impl Client { device_id: response.device_id.clone(), user_id: response.user_id.clone(), }; - self.session = Some(session); + *self.session.write().await = Some(session); #[cfg(feature = "encryption")] { @@ -265,7 +268,7 @@ impl Client { } pub(crate) async fn calculate_room_name(&self, room_id: &RoomId) -> Option { - if let Some(room) = self.joined_rooms.get(room_id) { + if let Some(room) = self.joined_rooms.read().await.get(room_id) { let room = room.read().await; Some(room.room_name.calculate_name(&room.members)) } else { @@ -275,81 +278,126 @@ impl Client { pub(crate) async fn calculate_room_names(&self) -> Vec { let mut res = Vec::new(); - for room in self.joined_rooms.values() { + for room in self.joined_rooms.read().await.values() { let room = room.read().await; res.push(room.room_name.calculate_name(&room.members)) } res } - pub(crate) fn get_or_create_joined_room(&mut self, room_id: &RoomId) -> &mut Arc> { + pub(crate) async fn get_or_create_joined_room(&self, room_id: &RoomId) -> Arc> { #[allow(clippy::or_fun_call)] - self.joined_rooms + let mut rooms = self.joined_rooms.write().await; + rooms .entry(room_id.clone()) .or_insert(Arc::new(RwLock::new(Room::new( room_id, &self .session + .read() + .await .as_ref() .expect("Receiving events while not being logged in") .user_id, )))) + .clone() } - pub(crate) fn get_joined_room(&self, room_id: &RoomId) -> Option<&Arc>> { - self.joined_rooms.get(room_id) + /// Get a joined room with the given room id. + /// + /// # Arguments + /// + /// `room_id` - The unique id of the room that should be fetched. + pub(crate) async fn get_joined_room(&self, room_id: &RoomId) -> Option>> { + self.joined_rooms.read().await.get(room_id).cloned() } - pub(crate) fn get_or_create_invited_room( - &mut self, - room_id: &RoomId, - ) -> &mut Arc> { + /// Returns the joined rooms this client knows about. + /// + /// A `HashMap` of room id to `matrix::models::Room` + pub fn joined_rooms(&self) -> Arc>>>> { + self.joined_rooms.clone() + } + + pub(crate) async fn get_or_create_invited_room(&self, room_id: &RoomId) -> Arc> { #[allow(clippy::or_fun_call)] - self.invited_rooms + let mut rooms = self.invited_rooms.write().await; + rooms .entry(room_id.clone()) .or_insert(Arc::new(RwLock::new(Room::new( room_id, &self .session + .read() + .await .as_ref() .expect("Receiving events while not being logged in") .user_id, )))) + .clone() } - pub(crate) fn get_invited_room(&self, room_id: &RoomId) -> Option<&Arc>> { - self.invited_rooms.get(room_id) + /// Get an invited room with the given room id. + /// + /// # Arguments + /// + /// `room_id` - The unique id of the room that should be fetched. + pub(crate) async fn get_invited_room(&self, room_id: &RoomId) -> Option>> { + self.invited_rooms.read().await.get(room_id).cloned() } - pub(crate) fn get_or_create_left_room(&mut self, room_id: &RoomId) -> &mut Arc> { + /// Returns the invited rooms this client knows about. + /// + /// A `HashMap` of room id to `matrix::models::Room` + pub fn invited_rooms(&self) -> Arc>>>> { + self.invited_rooms.clone() + } + + pub(crate) async fn get_or_create_left_room(&self, room_id: &RoomId) -> Arc> { #[allow(clippy::or_fun_call)] - self.lefted_rooms + let mut rooms = self.left_rooms.write().await; + rooms .entry(room_id.clone()) .or_insert(Arc::new(RwLock::new(Room::new( room_id, &self .session + .read() + .await .as_ref() .expect("Receiving events while not being logged in") .user_id, )))) + .clone() } - pub(crate) fn get_left_room(&self, room_id: &RoomId) -> Option<&Arc>> { - self.lefted_rooms.get(room_id) + /// Get an left room with the given room id. + /// + /// # Arguments + /// + /// `room_id` - The unique id of the room that should be fetched. + pub(crate) async fn get_left_room(&self, room_id: &RoomId) -> Option>> { + self.left_rooms.read().await.get(room_id).cloned() + } + + /// Returns the left rooms this client knows about. + /// + /// A `HashMap` of room id to `matrix::models::Room` + pub fn left_rooms(&self) -> Arc>>>> { + self.left_rooms.clone() } /// Handle a m.ignored_user_list event, updating the room state if necessary. /// /// Returns true if the room name changed, false otherwise. - pub(crate) fn handle_ignored_users(&mut self, event: &IgnoredUserListEvent) -> bool { + pub(crate) async fn handle_ignored_users(&self, event: &IgnoredUserListEvent) -> bool { // this avoids cloning every UserId for the eq check - if self.ignored_users.iter().collect::>() + if self.ignored_users.read().await.iter().collect::>() == event.content.ignored_users.iter().collect::>() { false } else { - self.ignored_users = event.content.ignored_users.to_vec(); + *self.ignored_users.write().await = event.content.ignored_users.to_vec(); true } } @@ -357,7 +405,7 @@ impl Client { /// Handle a m.ignored_user_list event, updating the room state if necessary. /// /// Returns true if the room name changed, false otherwise. - pub(crate) fn handle_push_rules(&mut self, event: &PushRulesEvent) -> bool { + pub(crate) async fn handle_push_rules(&self, event: &PushRulesEvent) -> bool { // TODO this is basically a stub // TODO ruma removed PartialEq for evens, so this doesn't work anymore. // Returning always true for now should be ok here since those don't @@ -365,7 +413,7 @@ impl Client { // if self.push_ruleset.as_ref() == Some(&event.content.global) { // false // } else { - self.push_ruleset = Some(event.content.global.clone()); + *self.push_ruleset.write().await = Some(event.content.global.clone()); true // } } @@ -381,7 +429,7 @@ impl Client { /// /// * `event` - The event that should be handled by the client. pub async fn receive_joined_timeline_event( - &mut self, + &self, room_id: &RoomId, event: &mut EventJson, ) -> (Option>, bool) { @@ -405,7 +453,8 @@ impl Client { } } - let mut room = self.get_or_create_joined_room(&room_id).write().await; + let room_lock = self.get_or_create_joined_room(&room_id).await; + let mut room = room_lock.write().await; (decrypted_event, room.receive_timeline_event(&e)) } _ => (None, false), @@ -422,12 +471,9 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - pub async fn receive_joined_state_event( - &mut self, - room_id: &RoomId, - event: &StateEvent, - ) -> bool { - let mut room = self.get_or_create_joined_room(room_id).write().await; + pub async fn receive_joined_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool { + let room_lock = self.get_or_create_joined_room(room_id).await; + let mut room = room_lock.write().await; room.receive_state_event(event) } @@ -442,11 +488,12 @@ impl Client { /// /// * `event` - A `AnyStrippedStateEvent` that should be handled by the client. pub async fn receive_invite_state_event( - &mut self, + &self, room_id: &RoomId, event: &AnyStrippedStateEvent, ) -> bool { - let mut room = self.get_or_create_invited_room(room_id).write().await; + let room_lock = self.get_or_create_invited_room(room_id).await; + let mut room = room_lock.write().await; room.receive_stripped_state_event(event) } @@ -461,7 +508,7 @@ impl Client { /// /// * `event` - The event that should be handled by the client. pub async fn receive_left_timeline_event( - &mut self, + &self, room_id: &RoomId, event: &mut EventJson, ) -> (Option>, bool) { @@ -485,7 +532,8 @@ impl Client { } } - let mut room = self.get_or_create_left_room(&room_id).write().await; + let room_lock = self.get_or_create_left_room(room_id).await; + let mut room = room_lock.write().await; (decrypted_event, room.receive_timeline_event(&e)) } _ => (None, false), @@ -502,8 +550,9 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - pub async fn receive_left_state_event(&mut self, room_id: &RoomId, event: &StateEvent) -> bool { - let mut room = self.get_or_create_left_room(room_id).write().await; + pub async fn receive_left_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool { + let room_lock = self.get_or_create_left_room(room_id).await; + let mut room = room_lock.write().await; room.receive_state_event(event) } @@ -517,13 +566,9 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - pub async fn receive_presence_event( - &mut self, - room_id: &RoomId, - event: &PresenceEvent, - ) -> bool { + pub async fn receive_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) -> bool { // this should be the room that was just created in the `Client::sync` loop. - if let Some(room) = self.get_joined_room(room_id) { + if let Some(room) = self.get_joined_room(room_id).await { let mut room = room.write().await; room.receive_presence_event(event) } else { @@ -540,15 +585,11 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The presence event for a specified room member. - pub async fn receive_account_data_event( - &mut self, - room_id: &RoomId, - event: &NonRoomEvent, - ) -> bool { + pub async fn receive_account_data_event(&self, room_id: &RoomId, event: &NonRoomEvent) -> bool { match event { - NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu), + NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu).await, NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await, - NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr), + NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr).await, _ => false, } } @@ -562,19 +603,21 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The presence event for a specified room member. - pub async fn receive_ephemeral_event( - &mut self, - room_id: &RoomId, - event: &NonRoomEvent, - ) -> bool { + pub async fn receive_ephemeral_event(&self, room_id: &RoomId, event: &NonRoomEvent) -> bool { match event { - NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu), + NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu).await, NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await, - NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr), + NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr).await, _ => false, } } + /// Get the current, if any, sync token of the client. + /// This will be None if the client didn't sync at least once. + pub async fn sync_token(&self) -> Option { + self.sync_token.read().await.clone() + } + /// Receive a response from a sync call. /// /// # Arguments @@ -583,11 +626,10 @@ impl Client { /// /// * `did_update` - Signals to the `StateStore` if the client state needs updating. pub async fn receive_sync_response( - &mut self, + &self, response: &mut api::sync::sync_events::Response, - did_update: bool, ) -> Result<()> { - self.sync_token = Some(response.next_batch.clone()); + *self.sync_token.write().await = Some(response.next_batch.clone()); #[cfg(feature = "encryption")] { @@ -599,8 +641,8 @@ impl Client { // TODO once the base client deals with callbacks move this into the // part where we already iterate through the rooms to avoid yet // another room loop. - for room in self.joined_rooms.values() { - let room = room.write().await; + for room in self.joined_rooms.read().await.values() { + let room = room.read().await; if !room.is_encrypted() { continue; } @@ -610,15 +652,228 @@ impl Client { } } - if did_update { - if let Some(store) = self.state_store.as_ref() { - let state = ClientState::from_base_client(&self); + // when events change state updated signals to state store to update database + let mut updated = self.iter_joined_rooms(response).await?; + + if self.iter_invited_rooms(&response).await? { + updated = true; + } + + if self.iter_left_rooms(response).await? { + updated = true; + } + + if updated { + let store = self.state_store.read().await; + + if let Some(store) = store.as_ref() { + let state = ClientState::from_base_client(&self).await; store.store_client_state(state).await?; } } Ok(()) } + async fn iter_joined_rooms( + &self, + response: &mut api::sync::sync_events::Response, + ) -> Result { + let mut updated = false; + for (room_id, joined_room) in &mut response.rooms.join { + let matrix_room = { + for event in &joined_room.state.events { + if let Ok(e) = event.deserialize() { + if self.receive_joined_state_event(&room_id, &e).await { + updated = true; + } + } + } + + self.get_or_create_joined_room(&room_id).await.clone() + }; + + // RoomSummary contains information for calculating room name + matrix_room + .write() + .await + .set_room_summary(&joined_room.summary); + + // re looping is not ideal here + for event in &mut joined_room.state.events { + if let Ok(e) = event.deserialize() { + self.emit_state_event(&room_id, &e, RoomStateType::Joined) + .await; + } + } + + for mut event in &mut joined_room.timeline.events { + let decrypted_event = { + let (decrypt_ev, timeline_update) = self + .receive_joined_timeline_event(room_id, &mut event) + .await; + if timeline_update { + updated = true; + }; + decrypt_ev + }; + + if let Some(e) = decrypted_event { + *event = e; + } + + if let Ok(e) = event.deserialize() { + self.emit_timeline_event(&room_id, &e, RoomStateType::Joined) + .await; + } + } + + // look at AccountData to further cut down users by collecting ignored users + if let Some(account_data) = &joined_room.account_data { + for account_data in &account_data.events { + { + if let Ok(e) = account_data.deserialize() { + if self.receive_account_data_event(&room_id, &e).await { + updated = true; + } + self.emit_account_data_event(room_id, &e, RoomStateType::Joined) + .await; + } + } + } + } + + // After the room has been created and state/timeline events accounted for we use the room_id of the newly created + // room to add any presence events that relate to a user in the current room. This is not super + // efficient but we need a room_id so we would loop through now or later. + for presence in &mut response.presence.events { + { + if let Ok(e) = presence.deserialize() { + if self.receive_presence_event(&room_id, &e).await { + updated = true; + } + + self.emit_presence_event(&room_id, &e, RoomStateType::Joined) + .await; + } + } + } + + for ephemeral in &mut joined_room.ephemeral.events { + { + if let Ok(e) = ephemeral.deserialize() { + if self.receive_ephemeral_event(&room_id, &e).await { + updated = true; + } + + self.emit_ephemeral_event(&room_id, &e, RoomStateType::Joined) + .await; + } + } + } + + if updated { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .store_room_state(matrix_room.read().await.deref()) + .await?; + } + } + } + Ok(updated) + } + + async fn iter_left_rooms( + &self, + response: &mut api::sync::sync_events::Response, + ) -> Result { + let mut updated = false; + for (room_id, left_room) in &mut response.rooms.leave { + let matrix_room = { + for event in &left_room.state.events { + if let Ok(e) = event.deserialize() { + if self.receive_left_state_event(&room_id, &e).await { + updated = true; + } + } + } + + self.get_or_create_left_room(&room_id).await.clone() + }; + + for event in &mut left_room.state.events { + if let Ok(e) = event.deserialize() { + self.emit_state_event(&room_id, &e, RoomStateType::Left) + .await; + } + } + + for mut event in &mut left_room.timeline.events { + let decrypted_event = { + let (decrypt_ev, timeline_update) = + self.receive_left_timeline_event(room_id, &mut event).await; + if timeline_update { + updated = true; + }; + decrypt_ev + }; + + if let Some(e) = decrypted_event { + *event = e; + } + + if let Ok(e) = event.deserialize() { + self.emit_timeline_event(&room_id, &e, RoomStateType::Left) + .await; + } + } + + if updated { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .store_room_state(matrix_room.read().await.deref()) + .await?; + } + } + } + Ok(updated) + } + + async fn iter_invited_rooms( + &self, + response: &api::sync::sync_events::Response, + ) -> Result { + let mut updated = false; + for (room_id, invited_room) in &response.rooms.invite { + let matrix_room = { + for event in &invited_room.invite_state.events { + if let Ok(e) = event.deserialize() { + if self.receive_invite_state_event(&room_id, &e).await { + updated = true; + } + } + } + + self.get_or_create_left_room(&room_id).await.clone() + }; + + for event in &invited_room.invite_state.events { + if let Ok(e) = event.deserialize() { + self.emit_stripped_state_event(&room_id, &e, RoomStateType::Invited) + .await; + } + } + + if updated { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .store_room_state(matrix_room.read().await.deref()) + .await?; + } + } + } + Ok(updated) + } + /// Should account or one-time keys be uploaded to the server. #[cfg(feature = "encryption")] #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))] @@ -686,7 +941,7 @@ impl Client { &self, room_id: &RoomId, ) -> Result> { - let room = self.get_joined_room(room_id).expect("No room found"); + let room = self.get_joined_room(room_id).await.expect("No room found"); let mut olm = self.olm.lock().await; match &mut *olm { @@ -809,35 +1064,62 @@ impl Client { event: &RoomEvent, room_state: RoomStateType, ) { - emit_event!( - self, room_id, event, room_state, - RoomEvent::RoomMember => on_room_member, - RoomEvent::RoomName => on_room_name, - RoomEvent::RoomCanonicalAlias => on_room_canonical_alias, - RoomEvent::RoomAliases => on_room_aliases, - RoomEvent::RoomAvatar => on_room_avatar, - RoomEvent::RoomMessage => on_room_message, - RoomEvent::RoomMessageFeedback => on_room_message_feedback, - RoomEvent::RoomRedaction => on_room_redaction, - RoomEvent::RoomPowerLevels => on_room_power_levels, - RoomEvent::RoomTombstone => on_room_tombstone, - ); - } + let lock = self.event_emitter.read().await; + let event_emitter = if let Some(ee) = lock.as_ref() { + ee + } else { + return; + }; - pub(crate) async fn emit_stripped_state_event( - &self, - room_id: &RoomId, - event: &AnyStrippedStateEvent, - room_state: RoomStateType, - ) { - emit_event!( - self, room_id, event, room_state, - AnyStrippedStateEvent::RoomMember => on_stripped_state_member, - AnyStrippedStateEvent::RoomName => on_stripped_state_name, - AnyStrippedStateEvent::RoomCanonicalAlias => on_stripped_state_canonical_alias, - AnyStrippedStateEvent::RoomAliases => on_stripped_state_aliases, - AnyStrippedStateEvent::RoomAvatar => on_stripped_state_avatar, - ); + let room = match room_state { + RoomStateType::Invited => { + if let Some(room) = self.get_invited_room(&room_id).await { + RoomState::Invited(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Joined => { + if let Some(room) = self.get_joined_room(&room_id).await { + RoomState::Joined(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Left => { + if let Some(room) = self.get_left_room(&room_id).await { + RoomState::Left(Arc::clone(&room)) + } else { + return; + } + } + }; + + match event { + RoomEvent::RoomMember(mem) => event_emitter.on_room_member(room, &mem).await, + RoomEvent::RoomName(name) => event_emitter.on_room_name(room, &name).await, + RoomEvent::RoomCanonicalAlias(canonical) => { + event_emitter + .on_room_canonical_alias(room, &canonical) + .await + } + RoomEvent::RoomAliases(aliases) => event_emitter.on_room_aliases(room, &aliases).await, + RoomEvent::RoomAvatar(avatar) => event_emitter.on_room_avatar(room, &avatar).await, + RoomEvent::RoomMessage(msg) => event_emitter.on_room_message(room, &msg).await, + RoomEvent::RoomMessageFeedback(msg_feedback) => { + event_emitter + .on_room_message_feedback(room, &msg_feedback) + .await + } + RoomEvent::RoomRedaction(redaction) => { + event_emitter.on_room_redaction(room, &redaction).await + } + RoomEvent::RoomPowerLevels(power) => { + event_emitter.on_room_power_levels(room, &power).await + } + RoomEvent::RoomTombstone(tomb) => event_emitter.on_room_tombstone(room, &tomb).await, + _ => {} + } } pub(crate) async fn emit_state_event( @@ -846,16 +1128,129 @@ impl Client { event: &StateEvent, room_state: RoomStateType, ) { - emit_event!( - self, room_id, event, room_state, - StateEvent::RoomMember => on_state_member, - StateEvent::RoomName => on_state_name, - StateEvent::RoomCanonicalAlias => on_state_canonical_alias, - StateEvent::RoomAliases => on_state_aliases, - StateEvent::RoomAvatar => on_state_avatar, - StateEvent::RoomPowerLevels => on_state_power_levels, - StateEvent::RoomTombstone => on_room_tombstone, - ); + let lock = self.event_emitter.read().await; + let event_emitter = if let Some(ee) = lock.as_ref() { + ee + } else { + return; + }; + + let room = match room_state { + RoomStateType::Invited => { + if let Some(room) = self.get_invited_room(&room_id).await { + RoomState::Invited(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Joined => { + if let Some(room) = self.get_joined_room(&room_id).await { + RoomState::Joined(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Left => { + if let Some(room) = self.get_left_room(&room_id).await { + RoomState::Left(Arc::clone(&room)) + } else { + return; + } + } + }; + + match event { + StateEvent::RoomMember(member) => event_emitter.on_state_member(room, &member).await, + StateEvent::RoomName(name) => event_emitter.on_state_name(room, &name).await, + StateEvent::RoomCanonicalAlias(canonical) => { + event_emitter + .on_state_canonical_alias(room, &canonical) + .await + } + StateEvent::RoomAliases(aliases) => { + event_emitter.on_state_aliases(room, &aliases).await + } + StateEvent::RoomAvatar(avatar) => event_emitter.on_state_avatar(room, &avatar).await, + StateEvent::RoomPowerLevels(power) => { + event_emitter.on_state_power_levels(room, &power).await + } + StateEvent::RoomJoinRules(rules) => { + event_emitter.on_state_join_rules(room, &rules).await + } + StateEvent::RoomTombstone(tomb) => event_emitter.on_room_tombstone(room, &tomb).await, + _ => {} + } + } + + pub(crate) async fn emit_stripped_state_event( + &self, + room_id: &RoomId, + event: &AnyStrippedStateEvent, + room_state: RoomStateType, + ) { + let lock = self.event_emitter.read().await; + let event_emitter = if let Some(ee) = lock.as_ref() { + ee + } else { + return; + }; + + let room = match room_state { + RoomStateType::Invited => { + if let Some(room) = self.get_invited_room(&room_id).await { + RoomState::Invited(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Joined => { + if let Some(room) = self.get_joined_room(&room_id).await { + RoomState::Joined(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Left => { + if let Some(room) = self.get_left_room(&room_id).await { + RoomState::Left(Arc::clone(&room)) + } else { + return; + } + } + }; + + match event { + AnyStrippedStateEvent::RoomMember(member) => { + event_emitter.on_stripped_state_member(room, &member).await + } + AnyStrippedStateEvent::RoomName(name) => { + event_emitter.on_stripped_state_name(room, &name).await + } + AnyStrippedStateEvent::RoomCanonicalAlias(canonical) => { + event_emitter + .on_stripped_state_canonical_alias(room, &canonical) + .await + } + AnyStrippedStateEvent::RoomAliases(aliases) => { + event_emitter + .on_stripped_state_aliases(room, &aliases) + .await + } + AnyStrippedStateEvent::RoomAvatar(avatar) => { + event_emitter.on_stripped_state_avatar(room, &avatar).await + } + AnyStrippedStateEvent::RoomPowerLevels(power) => { + event_emitter + .on_stripped_state_power_levels(room, &power) + .await + } + AnyStrippedStateEvent::RoomJoinRules(rules) => { + event_emitter + .on_stripped_state_join_rules(room, &rules) + .await + } + _ => {} + } } pub(crate) async fn emit_account_data_event( @@ -864,13 +1259,54 @@ impl Client { event: &NonRoomEvent, room_state: RoomStateType, ) { - emit_event!( - self, room_id, event, room_state, - NonRoomEvent::Presence => on_account_presence, - NonRoomEvent::IgnoredUserList => on_account_ignored_users, - NonRoomEvent::PushRules => on_account_push_rules, - NonRoomEvent::FullyRead => on_account_data_fully_read, - ); + let lock = self.event_emitter.read().await; + let event_emitter = if let Some(ee) = lock.as_ref() { + ee + } else { + return; + }; + + let room = match room_state { + RoomStateType::Invited => { + if let Some(room) = self.get_invited_room(&room_id).await { + RoomState::Invited(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Joined => { + if let Some(room) = self.get_joined_room(&room_id).await { + RoomState::Joined(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Left => { + if let Some(room) = self.get_left_room(&room_id).await { + RoomState::Left(Arc::clone(&room)) + } else { + return; + } + } + }; + + match event { + NonRoomEvent::Presence(presence) => { + event_emitter.on_account_presence(room, &presence).await + } + NonRoomEvent::IgnoredUserList(ignored) => { + event_emitter.on_account_ignored_users(room, &ignored).await + } + NonRoomEvent::PushRules(rules) => { + event_emitter.on_account_push_rules(room, &rules).await + } + NonRoomEvent::FullyRead(full_read) => { + event_emitter + .on_account_data_fully_read(room, &full_read) + .await + } + _ => {} + } } pub(crate) async fn emit_ephemeral_event( @@ -879,13 +1315,54 @@ impl Client { event: &NonRoomEvent, room_state: RoomStateType, ) { - emit_event!( - self, room_id, event, room_state, - NonRoomEvent::Presence => on_account_presence, - NonRoomEvent::IgnoredUserList => on_account_ignored_users, - NonRoomEvent::PushRules => on_account_push_rules, - NonRoomEvent::FullyRead => on_account_data_fully_read, - ); + let lock = self.event_emitter.read().await; + let event_emitter = if let Some(ee) = lock.as_ref() { + ee + } else { + return; + }; + + let room = match room_state { + RoomStateType::Invited => { + if let Some(room) = self.get_invited_room(&room_id).await { + RoomState::Invited(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Joined => { + if let Some(room) = self.get_joined_room(&room_id).await { + RoomState::Joined(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Left => { + if let Some(room) = self.get_left_room(&room_id).await { + RoomState::Left(Arc::clone(&room)) + } else { + return; + } + } + }; + + match event { + NonRoomEvent::Presence(presence) => { + event_emitter.on_account_presence(room, &presence).await + } + NonRoomEvent::IgnoredUserList(ignored) => { + event_emitter.on_account_ignored_users(room, &ignored).await + } + NonRoomEvent::PushRules(rules) => { + event_emitter.on_account_push_rules(room, &rules).await + } + NonRoomEvent::FullyRead(full_read) => { + event_emitter + .on_account_data_fully_read(room, &full_read) + .await + } + _ => {} + } } pub(crate) async fn emit_presence_event( @@ -894,27 +1371,31 @@ impl Client { event: &PresenceEvent, room_state: RoomStateType, ) { - if let Some(ee) = &self.event_emitter { - match room_state { - RoomStateType::Invited => { - if let Some(room) = self.get_invited_room(&room_id) { - ee.on_presence_event(RoomState::Invited(Arc::clone(&room)), &event) - .await; - } - } - RoomStateType::Joined => { - if let Some(room) = self.get_joined_room(&room_id) { - ee.on_presence_event(RoomState::Joined(Arc::clone(&room)), &event) - .await; - } - } - RoomStateType::Left => { - if let Some(room) = self.get_left_room(&room_id) { - ee.on_presence_event(RoomState::Left(Arc::clone(&room)), &event) - .await; - } + let room = match room_state { + RoomStateType::Invited => { + if let Some(room) = self.get_invited_room(&room_id).await { + RoomState::Invited(Arc::clone(&room)) + } else { + return; } } + RoomStateType::Joined => { + if let Some(room) = self.get_joined_room(&room_id).await { + RoomState::Joined(Arc::clone(&room)) + } else { + return; + } + } + RoomStateType::Left => { + if let Some(room) = self.get_left_room(&room_id).await { + RoomState::Left(Arc::clone(&room)) + } else { + return; + } + } + }; + if let Some(ee) = &self.event_emitter.read().await.as_ref() { + ee.on_presence_event(room, &event).await; } } } @@ -956,7 +1437,8 @@ mod test { let _response = client.sync(sync_settings).await.unwrap(); - let bc = &client.base_client.read().await; - assert_eq!(1, bc.ignored_users.len()) + let bc = &client.base_client; + let ignored_users = bc.ignored_users.read().await; + assert_eq!(1, ignored_users.len()) } } diff --git a/matrix_sdk/src/event_emitter/mod.rs b/matrix_sdk/src/event_emitter/mod.rs index 0ddc77a1..9d1ff735 100644 --- a/matrix_sdk/src/event_emitter/mod.rs +++ b/matrix_sdk/src/event_emitter/mod.rs @@ -266,7 +266,7 @@ mod test { self.0.lock().await.push("account ignore".to_string()) } async fn on_account_push_rules(&self, _: RoomState, _: &PushRulesEvent) { - self.0.lock().await.push("".to_string()) + self.0.lock().await.push("account push rules".to_string()) } async fn on_account_data_fully_read(&self, _: RoomState, _: &FullyReadEvent) { self.0.lock().await.push("account read".to_string()) @@ -317,6 +317,7 @@ mod test { assert_eq!( v.as_slice(), [ + "state rules", "state member", "state aliases", "state power", @@ -396,6 +397,7 @@ mod test { assert_eq!( v.as_slice(), [ + "state rules", "state member", "state aliases", "state power", diff --git a/matrix_sdk/src/models/room.rs b/matrix_sdk/src/models/room.rs index 2224fbe9..148cc1e9 100644 --- a/matrix_sdk/src/models/room.rs +++ b/matrix_sdk/src/models/room.rs @@ -507,7 +507,8 @@ mod test { let _response = client.sync(sync_settings).await.unwrap(); - let rooms = &client.base_client.read().await.joined_rooms; + let rooms_lock = &client.base_client.joined_rooms(); + let rooms = rooms_lock.read().await; let room = &rooms .get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap()) .unwrap() diff --git a/matrix_sdk/src/state/mod.rs b/matrix_sdk/src/state/mod.rs index b670e947..bd201cf0 100644 --- a/matrix_sdk/src/state/mod.rs +++ b/matrix_sdk/src/state/mod.rs @@ -13,13 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use serde::{Deserialize, Serialize}; use std::collections::HashMap; pub mod state_store; pub use state_store::JsonStore; -use serde::{Deserialize, Serialize}; - use crate::base_client::{Client as BaseClient, Token}; use crate::events::push_rules::Ruleset; use crate::identifiers::{RoomId, UserId}; @@ -48,7 +47,7 @@ impl PartialEq for ClientState { } impl ClientState { - pub fn from_base_client(client: &BaseClient) -> ClientState { + pub async fn from_base_client(client: &BaseClient) -> ClientState { let BaseClient { sync_token, ignored_users, @@ -56,9 +55,9 @@ impl ClientState { .. } = client; Self { - sync_token: sync_token.clone(), - ignored_users: ignored_users.clone(), - push_ruleset: push_ruleset.clone(), + sync_token: sync_token.read().await.clone(), + ignored_users: ignored_users.read().await.clone(), + push_ruleset: push_ruleset.read().await.clone(), } } } diff --git a/matrix_sdk/src/state/state_store.rs b/matrix_sdk/src/state/state_store.rs index 6e085ee9..75c9771c 100644 --- a/matrix_sdk/src/state/state_store.rs +++ b/matrix_sdk/src/state/state_store.rs @@ -287,16 +287,16 @@ mod test { AsyncClient::new_with_config(homeserver, Some(session.clone()), config).unwrap(); client.sync(sync_settings).await.unwrap(); - let base_client = client.base_client.read().await; + let base_client = &client.base_client; // assert the synced client and the logged in client are equal - assert_eq!(base_client.session, Some(session)); + assert_eq!(*base_client.session().read().await, Some(session)); assert_eq!( - base_client.sync_token, + base_client.sync_token().await, Some("s526_47314_0_7_1_1_1_11444_1".to_string()) ); assert_eq!( - base_client.ignored_users, + *base_client.ignored_users.read().await, vec![UserId::try_from("@someone:example.org").unwrap()] ); } diff --git a/matrix_sdk/src/test_builder.rs b/matrix_sdk/src/test_builder.rs index 1b18c8f3..8f173285 100644 --- a/matrix_sdk/src/test_builder.rs +++ b/matrix_sdk/src/test_builder.rs @@ -343,13 +343,11 @@ impl ClientTestRunner { } async fn stream_client_events(&mut self) { - let mut cli = self + let cli = &self .client .as_ref() .expect("`AsyncClient` must be set use `ClientTestRunner::set_client`") - .base_client - .write() - .await; + .base_client; let room_id = &self.room_user_id.0; diff --git a/matrix_sdk_crypto/src/machine.rs b/matrix_sdk_crypto/src/machine.rs index b041532a..c3e63c60 100644 --- a/matrix_sdk_crypto/src/machine.rs +++ b/matrix_sdk_crypto/src/machine.rs @@ -984,7 +984,7 @@ impl OlmMachine { .map_err(|_| EventError::UnsupportedOlmType)?; // Decrypt the OlmMessage and get a Ruma event out of it. - let (mut decrypted_event, signing_key) = self + let (decrypted_event, signing_key) = self .decrypt_olm_message(&event.sender, &content.sender_key, message) .await?; @@ -992,14 +992,23 @@ impl OlmMachine { // Handle the decrypted event, e.g. fetch out Megolm sessions out of // the event. - self.handle_decrypted_to_device_event( - &content.sender_key, - &signing_key, - &mut decrypted_event, - ) - .await?; - - Ok(decrypted_event) + if let Some(event) = self + .handle_decrypted_to_device_event( + &content.sender_key, + &signing_key, + &decrypted_event, + ) + .await? + { + // Some events may have sensitive data e.g. private keys, while we + // wan't to notify our users that a private key was received we + // don't want them to be able to do silly things with it. Handling + // events modifies them and returns a modified one, so replace it + // here if we get one. + Ok(event) + } else { + Ok(decrypted_event) + } } else { warn!("Olm event doesn't contain a ciphertext for our key"); Err(EventError::MissingCiphertext.into()) @@ -1012,7 +1021,7 @@ impl OlmMachine { sender_key: &str, signing_key: &str, event: &mut ToDeviceRoomKey, - ) -> OlmResult<()> { + ) -> OlmResult>> { match event.content.algorithm { Algorithm::MegolmV1AesSha2 => { let session_key = GroupSessionKey(mem::take(&mut event.content.session_key)); @@ -1024,14 +1033,24 @@ impl OlmMachine { session_key, )?; let _ = self.store.save_inbound_group_session(session).await?; - Ok(()) + // TODO ideally we would rewrap the event again just like so + // let event = EventJson::from(ToDeviceEvent::RoomKey(event.clone())); + // This saidly lacks a type once it's serialized again, fix + // this in Ruma. + let mut json = serde_json::to_value(event.clone())?; + json.as_object_mut() + .unwrap() + .insert("type".to_owned(), Value::String("m.room_key".to_owned())); + let event = serde_json::from_value::>(json)?; + + Ok(Some(event)) } _ => { warn!( "Received room key with unsupported key algorithm {}", event.content.algorithm ); - Ok(()) + Ok(None) } } } @@ -1330,25 +1349,26 @@ impl OlmMachine { &mut self, sender_key: &str, signing_key: &str, - event: &mut EventJson, - ) -> OlmResult<()> { + event: &EventJson, + ) -> OlmResult>> { let event = if let Ok(e) = event.deserialize() { e } else { warn!("Decrypted to-device event failed to be parsed correctly"); - return Ok(()); + return Ok(None); }; match event { ToDeviceEvent::RoomKey(mut e) => { - self.add_room_key(sender_key, signing_key, &mut e).await + Ok(self.add_room_key(sender_key, signing_key, &mut e).await?) } ToDeviceEvent::ForwardedRoomKey(e) => { - self.add_forwarded_room_key(sender_key, signing_key, &e) + self.add_forwarded_room_key(sender_key, signing_key, &e)?; + Ok(None) } _ => { warn!("Received a unexpected encrypted to-device event"); - Ok(()) + Ok(None) } } } @@ -1657,7 +1677,7 @@ mod test { let mut bob_keys = BTreeMap::new(); - let one_time_key = one_time_keys.iter().nth(0).unwrap(); + let one_time_key = one_time_keys.iter().next().unwrap(); let mut keys = BTreeMap::new(); keys.insert(one_time_key.0.clone(), one_time_key.1.clone()); bob_keys.insert(bob.device_id.clone(), keys); @@ -1820,7 +1840,7 @@ mod test { let identity_keys = machine.account.identity_keys(); let ed25519_key = identity_keys.ed25519(); - let mut one_time_key = one_time_keys.values_mut().nth(0).unwrap(); + let mut one_time_key = one_time_keys.values_mut().next().unwrap(); let ret = machine.verify_json( &machine.user_id, @@ -1848,7 +1868,7 @@ mod test { &machine.user_id, &machine.device_id, ed25519_key, - &mut json!(&mut one_time_keys.as_mut().unwrap().values_mut().nth(0)), + &mut json!(&mut one_time_keys.as_mut().unwrap().values_mut().next()), ); assert!(ret.is_ok()); @@ -1923,7 +1943,7 @@ mod test { let mut bob_keys = BTreeMap::new(); - let one_time_key = one_time_keys.iter().nth(0).unwrap(); + let one_time_key = one_time_keys.iter().next().unwrap(); let mut keys = BTreeMap::new(); keys.insert(one_time_key.0.clone(), one_time_key.1.clone()); bob_keys.insert(bob_machine.device_id.clone(), keys); @@ -2011,6 +2031,7 @@ mod test { if let AnyToDeviceEvent::RoomKey(e) = event.deserialize().unwrap() { assert_eq!(e.sender, alice.user_id); + assert!(e.content.session_key.is_empty()) } else { panic!("Event had the wrong type"); } diff --git a/matrix_sdk_types/Cargo.toml b/matrix_sdk_types/Cargo.toml index 9d016f19..9f3e192f 100644 --- a/matrix_sdk_types/Cargo.toml +++ b/matrix_sdk_types/Cargo.toml @@ -13,6 +13,6 @@ version = "0.1.0" [dependencies] js_int = "0.1.5" ruma-api = "0.16.0" -ruma-client-api = { git = "https://github.com/matrix-org/ruma-client-api" } +ruma-client-api = "0.8.0" ruma-events = "0.21.0" ruma-identifiers = "0.16.1"