From 1639f0fdd8f4d44cf7e9cb33e1888cfaf86dda3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Wed, 6 May 2020 15:36:55 +0200 Subject: [PATCH] async_client: Remove the lock around the base client. --- matrix_sdk/src/async_client.rs | 85 ++++++++--------------------- matrix_sdk/src/base_client.rs | 17 +++--- matrix_sdk/src/models/room.rs | 2 +- matrix_sdk/src/state/state_store.rs | 2 +- matrix_sdk/src/test_builder.rs | 6 +- 5 files changed, 35 insertions(+), 77 deletions(-) diff --git a/matrix_sdk/src/async_client.rs b/matrix_sdk/src/async_client.rs index d9a62962..745b5b71 100644 --- a/matrix_sdk/src/async_client.rs +++ b/matrix_sdk/src/async_client.rs @@ -53,17 +53,17 @@ use crate::{Error, EventEmitter, Result}; const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); -#[derive(Clone)] /// An async/await enabled Matrix client. /// /// All of the state is held in an `Arc` so the `AsyncClient` can be cloned freely. +#[derive(Clone)] pub struct AsyncClient { /// The URL of the homeserver to connect to. homeserver: Url, /// The underlying HTTP client. http_client: reqwest::Client, /// User session data. - pub(crate) base_client: Arc>, + pub(crate) base_client: BaseClient, } impl std::fmt::Debug for AsyncClient { @@ -294,13 +294,13 @@ impl AsyncClient { Ok(Self { homeserver, http_client, - base_client: Arc::new(RwLock::new(base_client)), + base_client, }) } /// Is the client logged in. pub async fn logged_in(&self) -> bool { - self.base_client.read().await.logged_in().await + self.base_client.logged_in().await } /// The Homeserver of the client. @@ -312,36 +312,28 @@ impl AsyncClient { /// /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. pub async fn add_event_emitter(&mut self, emitter: Box) { - self.base_client - .read() - .await - .add_event_emitter(emitter) - .await; + self.base_client.add_event_emitter(emitter).await; } /// Returns an `Option` of the room name from a `RoomId`. /// /// This is a human readable room name. pub async fn get_room_name(&self, room_id: &RoomId) -> Option { - self.base_client - .read() - .await - .calculate_room_name(room_id) - .await + self.base_client.calculate_room_name(room_id).await } /// Returns a `Vec` of the room names this client knows about. /// /// This is a human readable list of room names. pub async fn get_room_names(&self) -> Vec { - self.base_client.read().await.calculate_room_names().await + self.base_client.calculate_room_names().await } /// Returns the rooms this client knows about. /// /// A `HashMap` of room id to `matrix::models::Room` pub async fn get_rooms(&self) -> Arc>>>> { - self.base_client.read().await.joined_rooms.clone() + self.base_client.joined_rooms.clone() } /// This allows `AsyncClient` to manually sync state with the provided `StateStore`. @@ -368,7 +360,7 @@ impl AsyncClient { /// # }); /// ``` pub async fn sync_with_state_store(&self) -> Result { - self.base_client.write().await.sync_with_state_store().await + self.base_client.sync_with_state_store().await } /// Login to the server. @@ -403,9 +395,7 @@ impl AsyncClient { }; let response = self.send(request).await?; - let mut client = self.base_client.write().await; - - client.receive_login_response(&response).await?; + self.base_client.receive_login_response(&response).await?; Ok(response) } @@ -625,7 +615,7 @@ impl AsyncClient { pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result { { // if the client has been synced from the state store don't sync again - if !self.base_client.read().await.is_state_store_synced() { + if !self.base_client.is_state_store_synced() { // this will bail out returning false if the store has not been set up if let Ok(synced) = self.sync_with_state_store().await { if synced { @@ -646,8 +636,9 @@ impl AsyncClient { let mut response = self.send(request).await?; - let client = self.base_client.read().await; - client.receive_sync_response(&mut response).await?; + self.base_client + .receive_sync_response(&mut response) + .await?; Ok(response) } @@ -734,7 +725,7 @@ impl AsyncClient { #[cfg(feature = "encryption")] { - if self.base_client.read().await.should_upload_keys().await { + if self.base_client.should_upload_keys().await { let response = self.keys_upload().await; if let Err(e) = response { @@ -742,7 +733,7 @@ impl AsyncClient { } } - if self.base_client.read().await.should_query_keys().await { + if self.base_client.should_query_keys().await { let response = self.keys_query().await; if let Err(e) = response { @@ -807,8 +798,7 @@ impl AsyncClient { }; let request_builder = if Request::METADATA.requires_authentication { - let client = self.base_client.read().await; - let session = client.session.read().await; + let session = self.base_client.session.read().await; if let Some(session) = session.as_ref() { request_builder.bearer_auth(&session.access_token) @@ -894,8 +884,7 @@ impl AsyncClient { #[cfg(feature = "encryption")] { let encrypted = { - let client = self.base_client.read().await; - let room = client.get_room(room_id).await; + let room = self.base_client.get_room(room_id).await; match room { Some(r) => r.read().await.is_encrypted(), @@ -905,40 +894,24 @@ impl AsyncClient { if encrypted { let missing_sessions = { - let client = self.base_client.read().await; - let room = client.get_room(room_id).await; + let room = self.base_client.get_room(room_id).await; let room = room.as_ref().unwrap().read().await; let users = room.members.keys(); - self.base_client - .read() - .await - .get_missing_sessions(users) - .await? + self.base_client.get_missing_sessions(users).await? }; if !missing_sessions.is_empty() { self.claim_one_time_keys(missing_sessions).await?; } - if self - .base_client - .read() - .await - .should_share_group_session(room_id) - .await - { + if self.base_client.should_share_group_session(room_id).await { // TODO we need to make sure that only one such request is // in flight per room at a time. self.share_group_session(room_id).await?; } raw_content = serde_json::value::to_raw_value( - &self - .base_client - .read() - .await - .encrypt(room_id, content) - .await?, + &self.base_client.encrypt(room_id, content).await?, )?; event_type = EventType::RoomEncrypted; } @@ -979,8 +952,6 @@ impl AsyncClient { let response = self.send(request).await?; self.base_client - .read() - .await .receive_keys_claim_response(&response) .await?; Ok(response) @@ -1002,8 +973,6 @@ impl AsyncClient { async fn share_group_session(&self, room_id: &RoomId) -> Result<()> { let mut requests = self .base_client - .read() - .await .share_group_session(room_id) .await .expect("Keys don't need to be uploaded"); @@ -1030,8 +999,6 @@ impl AsyncClient { async fn keys_upload(&self) -> Result { let (device_keys, one_time_keys) = self .base_client - .read() - .await .keys_for_upload() .await .expect("Keys don't need to be uploaded"); @@ -1049,8 +1016,6 @@ impl AsyncClient { let response = self.send(request).await?; self.base_client - .read() - .await .receive_keys_upload_response(&response) .await?; Ok(response) @@ -1059,7 +1024,7 @@ impl AsyncClient { /// Get the current, if any, sync token of the client. /// This will be None if the client didn't sync at least once. pub async fn sync_token(&self) -> Option { - self.base_client.read().await.sync_token().await + self.base_client.sync_token().await } /// Query the server for users device keys. @@ -1073,8 +1038,6 @@ impl AsyncClient { async fn keys_query(&self) -> Result { let mut users_for_query = self .base_client - .read() - .await .users_for_key_query() .await .expect("Keys don't need to be uploaded"); @@ -1098,8 +1061,6 @@ impl AsyncClient { let response = self.send(request).await?; self.base_client - .read() - .await .receive_keys_query_response(&response) .await?; diff --git a/matrix_sdk/src/base_client.rs b/matrix_sdk/src/base_client.rs index 421812d2..3d38ecee 100644 --- a/matrix_sdk/src/base_client.rs +++ b/matrix_sdk/src/base_client.rs @@ -63,6 +63,7 @@ pub type Token = String; /// /// This Client is a state machine that receives responses and events and /// accordingly updates it's state. +#[derive(Clone)] pub struct Client { /// The current client session containing our user id, device id and access /// token. @@ -173,7 +174,7 @@ impl Client { /// When a client is provided the state store will load state from the `StateStore`. /// /// Returns `true` when a state store sync has successfully completed. - pub(crate) async fn sync_with_state_store(&mut self) -> Result { + pub(crate) async fn sync_with_state_store(&self) -> Result { let store = self.state_store.read().await; if let Some(store) = store.as_ref() { if let Some(sess) = self.session.read().await.as_ref() { @@ -193,12 +194,10 @@ impl Client { } let mut rooms = store.load_all_rooms().await?; - self.joined_rooms = Arc::new(RwLock::new( - rooms - .drain() - .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) - .collect(), - )); + *self.joined_rooms.write().await = rooms + .drain() + .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) + .collect(); self.needs_state_store_sync.store(false, Ordering::Relaxed); } @@ -213,7 +212,7 @@ impl Client { /// * `response` - A successful login response that contains our access token /// and device id. pub async fn receive_login_response( - &mut self, + &self, response: &api::session::login::Response, ) -> Result<()> { let session = Session { @@ -939,7 +938,7 @@ mod test { let _response = client.sync(sync_settings).await.unwrap(); - let bc = &client.base_client.read().await; + let bc = &client.base_client; let ignored_users = bc.ignored_users.read().await; assert_eq!(1, ignored_users.len()) } diff --git a/matrix_sdk/src/models/room.rs b/matrix_sdk/src/models/room.rs index 6cc0b5fe..a5c25f40 100644 --- a/matrix_sdk/src/models/room.rs +++ b/matrix_sdk/src/models/room.rs @@ -493,7 +493,7 @@ mod test { let _response = client.sync(sync_settings).await.unwrap(); - let rooms_lock = &client.base_client.read().await.joined_rooms; + let rooms_lock = &client.base_client.joined_rooms; let rooms = rooms_lock.read().await; let room = &rooms .get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap()) diff --git a/matrix_sdk/src/state/state_store.rs b/matrix_sdk/src/state/state_store.rs index 67880ce5..7bb91aac 100644 --- a/matrix_sdk/src/state/state_store.rs +++ b/matrix_sdk/src/state/state_store.rs @@ -287,7 +287,7 @@ mod test { AsyncClient::new_with_config(homeserver, Some(session.clone()), config).unwrap(); client.sync(sync_settings).await.unwrap(); - let base_client = client.base_client.read().await; + let base_client = &client.base_client; // assert the synced client and the logged in client are equal assert_eq!(*base_client.session.read().await, Some(session)); diff --git a/matrix_sdk/src/test_builder.rs b/matrix_sdk/src/test_builder.rs index a54a6d8e..8f173285 100644 --- a/matrix_sdk/src/test_builder.rs +++ b/matrix_sdk/src/test_builder.rs @@ -343,13 +343,11 @@ impl ClientTestRunner { } async fn stream_client_events(&mut self) { - let cli = self + let cli = &self .client .as_ref() .expect("`AsyncClient` must be set use `ClientTestRunner::set_client`") - .base_client - .write() - .await; + .base_client; let room_id = &self.room_user_id.0;