From 967544bab97c59e00d674750a0d9f634b8c01a34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 6 May 2020 13:57:58 +0200 Subject: [PATCH] base_client: Make the base client a bit more threadsafe. This moves the bulk of the sync logic into the base client, to avoid deadlocks while someone tires to send messages from a event callback the base client needed to get a bunch of locks. Ideally the AsyncClient would not need a lock for the base client at all but we're not there yet. --- matrix_sdk/src/async_client.rs | 114 +---------- matrix_sdk/src/base_client.rs | 282 ++++++++++++++++++---------- matrix_sdk/src/models/room.rs | 3 +- matrix_sdk/src/state/mod.rs | 11 +- matrix_sdk/src/state/state_store.rs | 4 +- matrix_sdk/src/test_builder.rs | 2 +- 6 files changed, 203 insertions(+), 213 deletions(-) diff --git a/matrix_sdk/src/async_client.rs b/matrix_sdk/src/async_client.rs index c304b273..f253baa8 100644 --- a/matrix_sdk/src/async_client.rs +++ b/matrix_sdk/src/async_client.rs @@ -17,7 +17,6 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; -use std::ops::Deref; use std::result::Result as StdResult; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -339,7 +338,7 @@ impl AsyncClient { /// Returns the rooms this client knows about. /// /// A `HashMap` of room id to `matrix::models::Room` - pub async fn get_rooms(&self) -> HashMap>> { + pub async fn get_rooms(&self) -> Arc>>>> { self.base_client.read().await.joined_rooms.clone() } @@ -645,109 +644,8 @@ impl AsyncClient { let mut response = self.send(request).await?; - let mut updated = false; - for (room_id, room) in &mut response.rooms.join { - let matrix_room = { - let mut client = self.base_client.write().await; - for event in &room.state.events { - if let Ok(e) = event.deserialize() { - if client.receive_joined_state_event(&room_id, &e).await { - updated = true; - } - } - } - - client.get_or_create_room(&room_id).clone() - }; - - // RoomSummary contains information for calculating room name - matrix_room.write().await.set_room_summary(&room.summary); - - // re looping is not ideal here - for event in &mut room.state.events { - if let Ok(e) = event.deserialize() { - let client = self.base_client.read().await; - client.emit_state_event(&room_id, &e).await; - } - } - - for mut event in &mut room.timeline.events { - let decrypted_event = { - let mut client = self.base_client.write().await; - let (decrypt_ev, timeline_update) = client - .receive_joined_timeline_event(room_id, &mut event) - .await; - if timeline_update { - updated = true; - }; - decrypt_ev - }; - - if let Some(e) = decrypted_event { - *event = e; - } - - if let Ok(e) = event.deserialize() { - let client = self.base_client.read().await; - client.emit_timeline_event(&room_id, &e).await; - } - } - - // look at AccountData to further cut down users by collecting ignored users - if let Some(account_data) = &room.account_data { - for account_data in &account_data.events { - { - if let Ok(e) = account_data.deserialize() { - let mut client = self.base_client.write().await; - if client.receive_account_data_event(&room_id, &e).await { - updated = true; - } - client.emit_account_data_event(room_id, &e).await; - } - } - } - } - - // After the room has been created and state/timeline events accounted for we use the room_id of the newly created - // room to add any presence events that relate to a user in the current room. This is not super - // efficient but we need a room_id so we would loop through now or later. - for presence in &mut response.presence.events { - { - if let Ok(e) = presence.deserialize() { - let mut client = self.base_client.write().await; - if client.receive_presence_event(&room_id, &e).await { - updated = true; - } - - client.emit_presence_event(&room_id, &e).await; - } - } - } - - for ephemeral in &mut room.ephemeral.events { - { - if let Ok(e) = ephemeral.deserialize() { - let mut client = self.base_client.write().await; - if client.receive_ephemeral_event(&room_id, &e).await { - updated = true; - } - - client.emit_ephemeral_event(&room_id, &e).await; - } - } - } - - if updated { - if let Some(store) = self.base_client.read().await.state_store.as_ref() { - store - .store_room_state(matrix_room.read().await.deref()) - .await?; - } - } - } - - let mut client = self.base_client.write().await; - client.receive_sync_response(&mut response, updated).await?; + let client = self.base_client.read().await; + client.receive_sync_response(&mut response).await?; Ok(response) } @@ -994,7 +892,7 @@ impl AsyncClient { { let encrypted = { let client = self.base_client.read().await; - let room = client.joined_rooms.get(room_id); + let room = client.get_room(room_id).await; match room { Some(r) => r.read().await.is_encrypted(), @@ -1005,7 +903,7 @@ impl AsyncClient { if encrypted { let missing_sessions = { let client = self.base_client.read().await; - let room = client.joined_rooms.get(room_id); + let room = client.get_room(room_id).await; let room = room.as_ref().unwrap().read().await; let users = room.members.keys(); self.base_client @@ -1158,7 +1056,7 @@ impl AsyncClient { /// Get the current, if any, sync token of the client. /// This will be None if the client didn't sync at least once. pub async fn sync_token(&self) -> Option { - self.base_client.read().await.sync_token.clone() + self.base_client.read().await.sync_token().await } /// Query the server for users device keys. diff --git a/matrix_sdk/src/base_client.rs b/matrix_sdk/src/base_client.rs index 121f7627..4055c5bf 100644 --- a/matrix_sdk/src/base_client.rs +++ b/matrix_sdk/src/base_client.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; #[cfg(feature = "encryption")] use std::collections::{BTreeMap, HashSet}; use std::fmt; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; #[cfg(feature = "encryption")] @@ -37,6 +38,7 @@ use crate::session::Session; use crate::state::{ClientState, StateStore}; use crate::EventEmitter; +use std::ops::Deref; #[cfg(feature = "encryption")] use tokio::sync::Mutex; use tokio::sync::RwLock; @@ -66,13 +68,13 @@ pub struct Client { /// token. pub session: Option, /// The current sync token that should be used for the next sync call. - pub sync_token: Option, + pub sync_token: Arc>>, /// A map of the rooms our user is joined in. - pub joined_rooms: HashMap>>, + pub joined_rooms: Arc>>>>, /// A list of ignored users. - pub ignored_users: Vec, + pub ignored_users: Arc>>, /// The push ruleset for the logged in user. - pub push_ruleset: Option, + pub push_ruleset: Arc>>, /// Any implementor of EventEmitter will act as the callbacks for various /// events. pub event_emitter: Option>, @@ -82,7 +84,7 @@ pub struct Client { /// There is a default implementation `JsonStore` that saves JSON to disk. pub state_store: Option>, /// Does the `Client` need to sync with the state store. - needs_state_store_sync: bool, + needs_state_store_sync: Arc, #[cfg(feature = "encryption")] olm: Arc>>, @@ -117,13 +119,13 @@ impl Client { Ok(Client { session, - sync_token: None, - joined_rooms: HashMap::new(), - ignored_users: Vec::new(), - push_ruleset: None, + sync_token: Arc::new(RwLock::new(None)), + joined_rooms: Arc::new(RwLock::new(HashMap::new())), + ignored_users: Arc::new(RwLock::new(Vec::new())), + push_ruleset: Arc::new(RwLock::new(None)), event_emitter: None, state_store: None, - needs_state_store_sync: true, + needs_state_store_sync: Arc::new(AtomicBool::from(true)), #[cfg(feature = "encryption")] olm: Arc::new(Mutex::new(olm)), }) @@ -143,7 +145,7 @@ impl Client { /// Returns true if the state store has been loaded into the client. pub fn is_state_store_synced(&self) -> bool { - !self.needs_state_store_sync + !self.needs_state_store_sync.load(Ordering::Relaxed) } /// When a client is provided the state store will load state from the `StateStore`. @@ -158,9 +160,9 @@ impl Client { ignored_users, push_ruleset, } = client_state; - self.sync_token = sync_token; - self.ignored_users = ignored_users; - self.push_ruleset = push_ruleset; + *self.sync_token.write().await = sync_token; + *self.ignored_users.write().await = ignored_users; + *self.push_ruleset.write().await = push_ruleset; } else { // return false and continues with a sync request then save the state and create // and populate the files during the sync @@ -168,15 +170,17 @@ impl Client { } let mut rooms = store.load_all_rooms().await?; - self.joined_rooms = rooms - .drain() - .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) - .collect(); + self.joined_rooms = Arc::new(RwLock::new( + rooms + .drain() + .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) + .collect(), + )); - self.needs_state_store_sync = false; + self.needs_state_store_sync.store(false, Ordering::Relaxed); } } - Ok(!self.needs_state_store_sync) + Ok(!self.needs_state_store_sync.load(Ordering::Relaxed)) } /// Receive a login response and update the session of the client. @@ -206,7 +210,7 @@ impl Client { } pub(crate) async fn calculate_room_name(&self, room_id: &RoomId) -> Option { - if let Some(room) = self.joined_rooms.get(room_id) { + if let Some(room) = self.joined_rooms.read().await.get(room_id) { let room = room.read().await; Some(room.room_name.calculate_name(&room.members)) } else { @@ -216,16 +220,17 @@ impl Client { pub(crate) async fn calculate_room_names(&self) -> Vec { let mut res = Vec::new(); - for room in self.joined_rooms.values() { + for room in self.joined_rooms.read().await.values() { let room = room.read().await; res.push(room.room_name.calculate_name(&room.members)) } res } - pub(crate) fn get_or_create_room(&mut self, room_id: &RoomId) -> &mut Arc> { + pub(crate) async fn get_or_create_room(&self, room_id: &RoomId) -> Arc> { #[allow(clippy::or_fun_call)] - self.joined_rooms + let mut rooms = self.joined_rooms.write().await; + rooms .entry(room_id.clone()) .or_insert(Arc::new(RwLock::new(Room::new( room_id, @@ -235,23 +240,24 @@ impl Client { .expect("Receiving events while not being logged in") .user_id, )))) + .clone() } - pub(crate) fn get_room(&self, room_id: &RoomId) -> Option<&Arc>> { - self.joined_rooms.get(room_id) + pub(crate) async fn get_room(&self, room_id: &RoomId) -> Option>> { + self.joined_rooms.read().await.get(room_id).cloned() } /// Handle a m.ignored_user_list event, updating the room state if necessary. /// /// Returns true if the room name changed, false otherwise. - pub(crate) fn handle_ignored_users(&mut self, event: &IgnoredUserListEvent) -> bool { + pub(crate) async fn handle_ignored_users(&self, event: &IgnoredUserListEvent) -> bool { // this avoids cloning every UserId for the eq check - if self.ignored_users.iter().collect::>() + if self.ignored_users.read().await.iter().collect::>() == event.content.ignored_users.iter().collect::>() { false } else { - self.ignored_users = event.content.ignored_users.to_vec(); + *self.ignored_users.write().await = event.content.ignored_users.to_vec(); true } } @@ -259,7 +265,7 @@ impl Client { /// Handle a m.ignored_user_list event, updating the room state if necessary. /// /// Returns true if the room name changed, false otherwise. - pub(crate) fn handle_push_rules(&mut self, event: &PushRulesEvent) -> bool { + pub(crate) async fn handle_push_rules(&self, event: &PushRulesEvent) -> bool { // TODO this is basically a stub // TODO ruma removed PartialEq for evens, so this doesn't work anymore. // Returning always true for now should be ok here since those don't @@ -267,7 +273,7 @@ impl Client { // if self.push_ruleset.as_ref() == Some(&event.content.global) { // false // } else { - self.push_ruleset = Some(event.content.global.clone()); + *self.push_ruleset.write().await = Some(event.content.global.clone()); true // } } @@ -283,7 +289,7 @@ impl Client { /// /// * `event` - The event that should be handled by the client. pub async fn receive_joined_timeline_event( - &mut self, + &self, room_id: &RoomId, event: &mut EventJson, ) -> (Option>, bool) { @@ -307,7 +313,8 @@ impl Client { } } - let mut room = self.get_or_create_room(&room_id).write().await; + let room_lock = self.get_or_create_room(&room_id).await; + let mut room = room_lock.write().await; (decrypted_event, room.receive_timeline_event(&e)) } _ => (None, false), @@ -324,12 +331,9 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - pub async fn receive_joined_state_event( - &mut self, - room_id: &RoomId, - event: &StateEvent, - ) -> bool { - let mut room = self.get_or_create_room(room_id).write().await; + pub async fn receive_joined_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool { + let room_lock = self.get_or_create_room(room_id).await; + let mut room = room_lock.write().await; room.receive_state_event(event) } @@ -343,13 +347,9 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - pub async fn receive_presence_event( - &mut self, - room_id: &RoomId, - event: &PresenceEvent, - ) -> bool { + pub async fn receive_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) -> bool { // this should be the room that was just created in the `Client::sync` loop. - if let Some(room) = self.get_room(room_id) { + if let Some(room) = self.get_room(room_id).await { let mut room = room.write().await; room.receive_presence_event(event) } else { @@ -366,15 +366,11 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The presence event for a specified room member. - pub async fn receive_account_data_event( - &mut self, - room_id: &RoomId, - event: &NonRoomEvent, - ) -> bool { + pub async fn receive_account_data_event(&self, room_id: &RoomId, event: &NonRoomEvent) -> bool { match event { - NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu), + NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu).await, NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await, - NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr), + NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr).await, _ => false, } } @@ -388,19 +384,21 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The presence event for a specified room member. - pub async fn receive_ephemeral_event( - &mut self, - room_id: &RoomId, - event: &NonRoomEvent, - ) -> bool { + pub async fn receive_ephemeral_event(&self, room_id: &RoomId, event: &NonRoomEvent) -> bool { match event { - NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu), + NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu).await, NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await, - NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr), + NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr).await, _ => false, } } + /// Get the current, if any, sync token of the client. + /// This will be None if the client didn't sync at least once. + pub async fn sync_token(&self) -> Option { + self.sync_token.read().await.clone() + } + /// Receive a response from a sync call. /// /// # Arguments @@ -409,11 +407,10 @@ impl Client { /// /// * `did_update` - Signals to the `StateStore` if the client state needs updating. pub async fn receive_sync_response( - &mut self, + &self, response: &mut api::sync::sync_events::Response, - did_update: bool, ) -> Result<()> { - self.sync_token = Some(response.next_batch.clone()); + *self.sync_token.write().await = Some(response.next_batch.clone()); #[cfg(feature = "encryption")] { @@ -425,8 +422,8 @@ impl Client { // TODO once the base client deals with callbacks move this into the // part where we already iterate through the rooms to avoid yet // another room loop. - for room in self.joined_rooms.values() { - let room = room.write().await; + for room in self.joined_rooms.read().await.values() { + let room = room.read().await; if !room.is_encrypted() { continue; } @@ -436,9 +433,103 @@ impl Client { } } - if did_update { + let mut updated = false; + for (room_id, room) in &mut response.rooms.join { + let matrix_room = { + for event in &room.state.events { + if let Ok(e) = event.deserialize() { + if self.receive_joined_state_event(&room_id, &e).await { + updated = true; + } + } + } + + self.get_or_create_room(&room_id).await.clone() + }; + + // RoomSummary contains information for calculating room name + matrix_room.write().await.set_room_summary(&room.summary); + + // re looping is not ideal here + for event in &mut room.state.events { + if let Ok(e) = event.deserialize() { + self.emit_state_event(&room_id, &e).await; + } + } + + for mut event in &mut room.timeline.events { + let decrypted_event = { + let (decrypt_ev, timeline_update) = self + .receive_joined_timeline_event(room_id, &mut event) + .await; + if timeline_update { + updated = true; + }; + decrypt_ev + }; + + if let Some(e) = decrypted_event { + *event = e; + } + + if let Ok(e) = event.deserialize() { + self.emit_timeline_event(&room_id, &e).await; + } + } + + // look at AccountData to further cut down users by collecting ignored users + if let Some(account_data) = &room.account_data { + for account_data in &account_data.events { + { + if let Ok(e) = account_data.deserialize() { + if self.receive_account_data_event(&room_id, &e).await { + updated = true; + } + self.emit_account_data_event(room_id, &e).await; + } + } + } + } + + // After the room has been created and state/timeline events accounted for we use the room_id of the newly created + // room to add any presence events that relate to a user in the current room. This is not super + // efficient but we need a room_id so we would loop through now or later. + for presence in &mut response.presence.events { + { + if let Ok(e) = presence.deserialize() { + if self.receive_presence_event(&room_id, &e).await { + updated = true; + } + + self.emit_presence_event(&room_id, &e).await; + } + } + } + + for ephemeral in &mut room.ephemeral.events { + { + if let Ok(e) = ephemeral.deserialize() { + if self.receive_ephemeral_event(&room_id, &e).await { + updated = true; + } + + self.emit_ephemeral_event(&room_id, &e).await; + } + } + } + + if updated { + if let Some(store) = self.state_store.as_ref() { + store + .store_room_state(matrix_room.read().await.deref()) + .await?; + } + } + } + + if updated { if let Some(store) = self.state_store.as_ref() { - let state = ClientState::from_base_client(&self); + let state = ClientState::from_base_client(&self).await; store.store_client_state(state).await?; } } @@ -512,7 +603,7 @@ impl Client { &self, room_id: &RoomId, ) -> Result> { - let room = self.get_room(room_id).expect("No room found"); + let room = self.get_room(room_id).await.expect("No room found"); let mut olm = self.olm.lock().await; match &mut *olm { @@ -633,21 +724,21 @@ impl Client { match event { RoomEvent::RoomMember(mem) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_member(Arc::clone(&room), &mem).await; } } } RoomEvent::RoomName(name) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_name(Arc::clone(&room), &name).await; } } } RoomEvent::RoomCanonicalAlias(canonical) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_canonical_alias(Arc::clone(&room), &canonical) .await; } @@ -655,28 +746,28 @@ impl Client { } RoomEvent::RoomAliases(aliases) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_aliases(Arc::clone(&room), &aliases).await; } } } RoomEvent::RoomAvatar(avatar) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_avatar(Arc::clone(&room), &avatar).await; } } } RoomEvent::RoomMessage(msg) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_message(Arc::clone(&room), &msg).await; } } } RoomEvent::RoomMessageFeedback(msg_feedback) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_message_feedback(Arc::clone(&room), &msg_feedback) .await; } @@ -684,21 +775,21 @@ impl Client { } RoomEvent::RoomRedaction(redaction) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_redaction(Arc::clone(&room), &redaction).await; } } } RoomEvent::RoomPowerLevels(power) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_power_levels(Arc::clone(&room), &power).await; } } } RoomEvent::RoomTombstone(tomb) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_tombstone(Arc::clone(&room), &tomb).await; } } @@ -711,21 +802,21 @@ impl Client { match event { StateEvent::RoomMember(member) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_state_member(Arc::clone(&room), &member).await; } } } StateEvent::RoomName(name) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_state_name(Arc::clone(&room), &name).await; } } } StateEvent::RoomCanonicalAlias(canonical) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_state_canonical_alias(Arc::clone(&room), &canonical) .await; } @@ -733,35 +824,35 @@ impl Client { } StateEvent::RoomAliases(aliases) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_state_aliases(Arc::clone(&room), &aliases).await; } } } StateEvent::RoomAvatar(avatar) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_state_avatar(Arc::clone(&room), &avatar).await; } } } StateEvent::RoomPowerLevels(power) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_state_power_levels(Arc::clone(&room), &power).await; } } } StateEvent::RoomJoinRules(rules) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_state_join_rules(Arc::clone(&room), &rules).await; } } } StateEvent::RoomTombstone(tomb) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_room_tombstone(Arc::clone(&room), &tomb).await; } } @@ -774,14 +865,14 @@ impl Client { match event { NonRoomEvent::Presence(presence) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_presence(Arc::clone(&room), &presence).await; } } } NonRoomEvent::IgnoredUserList(ignored) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_ignored_users(Arc::clone(&room), &ignored) .await; } @@ -789,14 +880,14 @@ impl Client { } NonRoomEvent::PushRules(rules) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_push_rules(Arc::clone(&room), &rules).await; } } } NonRoomEvent::FullyRead(full_read) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_data_fully_read(Arc::clone(&room), &full_read) .await; } @@ -810,14 +901,14 @@ impl Client { match event { NonRoomEvent::Presence(presence) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_presence(Arc::clone(&room), &presence).await; } } } NonRoomEvent::IgnoredUserList(ignored) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_ignored_users(Arc::clone(&room), &ignored) .await; } @@ -825,14 +916,14 @@ impl Client { } NonRoomEvent::PushRules(rules) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_push_rules(Arc::clone(&room), &rules).await; } } } NonRoomEvent::FullyRead(full_read) => { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_account_data_fully_read(Arc::clone(&room), &full_read) .await; } @@ -844,7 +935,7 @@ impl Client { pub(crate) async fn emit_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) { if let Some(ee) = &self.event_emitter { - if let Some(room) = self.get_room(&room_id) { + if let Some(room) = self.get_room(&room_id).await { ee.on_presence_event(Arc::clone(&room), &event).await; } } @@ -889,6 +980,7 @@ mod test { let _response = client.sync(sync_settings).await.unwrap(); let bc = &client.base_client.read().await; - assert_eq!(1, bc.ignored_users.len()) + let ignored_users = bc.ignored_users.read().await; + assert_eq!(1, ignored_users.len()) } } diff --git a/matrix_sdk/src/models/room.rs b/matrix_sdk/src/models/room.rs index 92c808a7..6cc0b5fe 100644 --- a/matrix_sdk/src/models/room.rs +++ b/matrix_sdk/src/models/room.rs @@ -493,7 +493,8 @@ mod test { let _response = client.sync(sync_settings).await.unwrap(); - let rooms = &client.base_client.read().await.joined_rooms; + let rooms_lock = &client.base_client.read().await.joined_rooms; + let rooms = rooms_lock.read().await; let room = &rooms .get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap()) .unwrap() diff --git a/matrix_sdk/src/state/mod.rs b/matrix_sdk/src/state/mod.rs index b670e947..bd201cf0 100644 --- a/matrix_sdk/src/state/mod.rs +++ b/matrix_sdk/src/state/mod.rs @@ -13,13 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use serde::{Deserialize, Serialize}; use std::collections::HashMap; pub mod state_store; pub use state_store::JsonStore; -use serde::{Deserialize, Serialize}; - use crate::base_client::{Client as BaseClient, Token}; use crate::events::push_rules::Ruleset; use crate::identifiers::{RoomId, UserId}; @@ -48,7 +47,7 @@ impl PartialEq for ClientState { } impl ClientState { - pub fn from_base_client(client: &BaseClient) -> ClientState { + pub async fn from_base_client(client: &BaseClient) -> ClientState { let BaseClient { sync_token, ignored_users, @@ -56,9 +55,9 @@ impl ClientState { .. } = client; Self { - sync_token: sync_token.clone(), - ignored_users: ignored_users.clone(), - push_ruleset: push_ruleset.clone(), + sync_token: sync_token.read().await.clone(), + ignored_users: ignored_users.read().await.clone(), + push_ruleset: push_ruleset.read().await.clone(), } } } diff --git a/matrix_sdk/src/state/state_store.rs b/matrix_sdk/src/state/state_store.rs index 6e085ee9..99e2ac9a 100644 --- a/matrix_sdk/src/state/state_store.rs +++ b/matrix_sdk/src/state/state_store.rs @@ -292,11 +292,11 @@ mod test { // assert the synced client and the logged in client are equal assert_eq!(base_client.session, Some(session)); assert_eq!( - base_client.sync_token, + base_client.sync_token().await, Some("s526_47314_0_7_1_1_1_11444_1".to_string()) ); assert_eq!( - base_client.ignored_users, + *base_client.ignored_users.read().await, vec![UserId::try_from("@someone:example.org").unwrap()] ); } diff --git a/matrix_sdk/src/test_builder.rs b/matrix_sdk/src/test_builder.rs index 1b18c8f3..a54a6d8e 100644 --- a/matrix_sdk/src/test_builder.rs +++ b/matrix_sdk/src/test_builder.rs @@ -343,7 +343,7 @@ impl ClientTestRunner { } async fn stream_client_events(&mut self) { - let mut cli = self + let cli = self .client .as_ref() .expect("`AsyncClient` must be set use `ClientTestRunner::set_client`")