async_client: Remove the lock around the base client.
parent
02013ac286
commit
1639f0fdd8
|
@ -53,17 +53,17 @@ use crate::{Error, EventEmitter, Result};
|
||||||
|
|
||||||
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
|
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
/// An async/await enabled Matrix client.
|
/// An async/await enabled Matrix client.
|
||||||
///
|
///
|
||||||
/// All of the state is held in an `Arc` so the `AsyncClient` can be cloned freely.
|
/// All of the state is held in an `Arc` so the `AsyncClient` can be cloned freely.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct AsyncClient {
|
pub struct AsyncClient {
|
||||||
/// The URL of the homeserver to connect to.
|
/// The URL of the homeserver to connect to.
|
||||||
homeserver: Url,
|
homeserver: Url,
|
||||||
/// The underlying HTTP client.
|
/// The underlying HTTP client.
|
||||||
http_client: reqwest::Client,
|
http_client: reqwest::Client,
|
||||||
/// User session data.
|
/// User session data.
|
||||||
pub(crate) base_client: Arc<RwLock<BaseClient>>,
|
pub(crate) base_client: BaseClient,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for AsyncClient {
|
impl std::fmt::Debug for AsyncClient {
|
||||||
|
@ -294,13 +294,13 @@ impl AsyncClient {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
homeserver,
|
homeserver,
|
||||||
http_client,
|
http_client,
|
||||||
base_client: Arc::new(RwLock::new(base_client)),
|
base_client,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Is the client logged in.
|
/// Is the client logged in.
|
||||||
pub async fn logged_in(&self) -> bool {
|
pub async fn logged_in(&self) -> bool {
|
||||||
self.base_client.read().await.logged_in().await
|
self.base_client.logged_in().await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The Homeserver of the client.
|
/// The Homeserver of the client.
|
||||||
|
@ -312,36 +312,28 @@ impl AsyncClient {
|
||||||
///
|
///
|
||||||
/// The methods of `EventEmitter` are called when the respective `RoomEvents` occur.
|
/// The methods of `EventEmitter` are called when the respective `RoomEvents` occur.
|
||||||
pub async fn add_event_emitter(&mut self, emitter: Box<dyn EventEmitter>) {
|
pub async fn add_event_emitter(&mut self, emitter: Box<dyn EventEmitter>) {
|
||||||
self.base_client
|
self.base_client.add_event_emitter(emitter).await;
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.add_event_emitter(emitter)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an `Option` of the room name from a `RoomId`.
|
/// Returns an `Option` of the room name from a `RoomId`.
|
||||||
///
|
///
|
||||||
/// This is a human readable room name.
|
/// This is a human readable room name.
|
||||||
pub async fn get_room_name(&self, room_id: &RoomId) -> Option<String> {
|
pub async fn get_room_name(&self, room_id: &RoomId) -> Option<String> {
|
||||||
self.base_client
|
self.base_client.calculate_room_name(room_id).await
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.calculate_room_name(room_id)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a `Vec` of the room names this client knows about.
|
/// Returns a `Vec` of the room names this client knows about.
|
||||||
///
|
///
|
||||||
/// This is a human readable list of room names.
|
/// This is a human readable list of room names.
|
||||||
pub async fn get_room_names(&self) -> Vec<String> {
|
pub async fn get_room_names(&self) -> Vec<String> {
|
||||||
self.base_client.read().await.calculate_room_names().await
|
self.base_client.calculate_room_names().await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the rooms this client knows about.
|
/// Returns the rooms this client knows about.
|
||||||
///
|
///
|
||||||
/// A `HashMap` of room id to `matrix::models::Room`
|
/// A `HashMap` of room id to `matrix::models::Room`
|
||||||
pub async fn get_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<tokio::sync::RwLock<Room>>>>> {
|
pub async fn get_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<tokio::sync::RwLock<Room>>>>> {
|
||||||
self.base_client.read().await.joined_rooms.clone()
|
self.base_client.joined_rooms.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This allows `AsyncClient` to manually sync state with the provided `StateStore`.
|
/// This allows `AsyncClient` to manually sync state with the provided `StateStore`.
|
||||||
|
@ -368,7 +360,7 @@ impl AsyncClient {
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn sync_with_state_store(&self) -> Result<bool> {
|
pub async fn sync_with_state_store(&self) -> Result<bool> {
|
||||||
self.base_client.write().await.sync_with_state_store().await
|
self.base_client.sync_with_state_store().await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Login to the server.
|
/// Login to the server.
|
||||||
|
@ -403,9 +395,7 @@ impl AsyncClient {
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request).await?;
|
||||||
let mut client = self.base_client.write().await;
|
self.base_client.receive_login_response(&response).await?;
|
||||||
|
|
||||||
client.receive_login_response(&response).await?;
|
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
@ -625,7 +615,7 @@ impl AsyncClient {
|
||||||
pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result<sync_events::Response> {
|
pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result<sync_events::Response> {
|
||||||
{
|
{
|
||||||
// if the client has been synced from the state store don't sync again
|
// if the client has been synced from the state store don't sync again
|
||||||
if !self.base_client.read().await.is_state_store_synced() {
|
if !self.base_client.is_state_store_synced() {
|
||||||
// this will bail out returning false if the store has not been set up
|
// this will bail out returning false if the store has not been set up
|
||||||
if let Ok(synced) = self.sync_with_state_store().await {
|
if let Ok(synced) = self.sync_with_state_store().await {
|
||||||
if synced {
|
if synced {
|
||||||
|
@ -646,8 +636,9 @@ impl AsyncClient {
|
||||||
|
|
||||||
let mut response = self.send(request).await?;
|
let mut response = self.send(request).await?;
|
||||||
|
|
||||||
let client = self.base_client.read().await;
|
self.base_client
|
||||||
client.receive_sync_response(&mut response).await?;
|
.receive_sync_response(&mut response)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
@ -734,7 +725,7 @@ impl AsyncClient {
|
||||||
|
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
{
|
{
|
||||||
if self.base_client.read().await.should_upload_keys().await {
|
if self.base_client.should_upload_keys().await {
|
||||||
let response = self.keys_upload().await;
|
let response = self.keys_upload().await;
|
||||||
|
|
||||||
if let Err(e) = response {
|
if let Err(e) = response {
|
||||||
|
@ -742,7 +733,7 @@ impl AsyncClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.base_client.read().await.should_query_keys().await {
|
if self.base_client.should_query_keys().await {
|
||||||
let response = self.keys_query().await;
|
let response = self.keys_query().await;
|
||||||
|
|
||||||
if let Err(e) = response {
|
if let Err(e) = response {
|
||||||
|
@ -807,8 +798,7 @@ impl AsyncClient {
|
||||||
};
|
};
|
||||||
|
|
||||||
let request_builder = if Request::METADATA.requires_authentication {
|
let request_builder = if Request::METADATA.requires_authentication {
|
||||||
let client = self.base_client.read().await;
|
let session = self.base_client.session.read().await;
|
||||||
let session = client.session.read().await;
|
|
||||||
|
|
||||||
if let Some(session) = session.as_ref() {
|
if let Some(session) = session.as_ref() {
|
||||||
request_builder.bearer_auth(&session.access_token)
|
request_builder.bearer_auth(&session.access_token)
|
||||||
|
@ -894,8 +884,7 @@ impl AsyncClient {
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
{
|
{
|
||||||
let encrypted = {
|
let encrypted = {
|
||||||
let client = self.base_client.read().await;
|
let room = self.base_client.get_room(room_id).await;
|
||||||
let room = client.get_room(room_id).await;
|
|
||||||
|
|
||||||
match room {
|
match room {
|
||||||
Some(r) => r.read().await.is_encrypted(),
|
Some(r) => r.read().await.is_encrypted(),
|
||||||
|
@ -905,40 +894,24 @@ impl AsyncClient {
|
||||||
|
|
||||||
if encrypted {
|
if encrypted {
|
||||||
let missing_sessions = {
|
let missing_sessions = {
|
||||||
let client = self.base_client.read().await;
|
let room = self.base_client.get_room(room_id).await;
|
||||||
let room = client.get_room(room_id).await;
|
|
||||||
let room = room.as_ref().unwrap().read().await;
|
let room = room.as_ref().unwrap().read().await;
|
||||||
let users = room.members.keys();
|
let users = room.members.keys();
|
||||||
self.base_client
|
self.base_client.get_missing_sessions(users).await?
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.get_missing_sessions(users)
|
|
||||||
.await?
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if !missing_sessions.is_empty() {
|
if !missing_sessions.is_empty() {
|
||||||
self.claim_one_time_keys(missing_sessions).await?;
|
self.claim_one_time_keys(missing_sessions).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if self
|
if self.base_client.should_share_group_session(room_id).await {
|
||||||
.base_client
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.should_share_group_session(room_id)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
// TODO we need to make sure that only one such request is
|
// TODO we need to make sure that only one such request is
|
||||||
// in flight per room at a time.
|
// in flight per room at a time.
|
||||||
self.share_group_session(room_id).await?;
|
self.share_group_session(room_id).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
raw_content = serde_json::value::to_raw_value(
|
raw_content = serde_json::value::to_raw_value(
|
||||||
&self
|
&self.base_client.encrypt(room_id, content).await?,
|
||||||
.base_client
|
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.encrypt(room_id, content)
|
|
||||||
.await?,
|
|
||||||
)?;
|
)?;
|
||||||
event_type = EventType::RoomEncrypted;
|
event_type = EventType::RoomEncrypted;
|
||||||
}
|
}
|
||||||
|
@ -979,8 +952,6 @@ impl AsyncClient {
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request).await?;
|
||||||
self.base_client
|
self.base_client
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.receive_keys_claim_response(&response)
|
.receive_keys_claim_response(&response)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(response)
|
Ok(response)
|
||||||
|
@ -1002,8 +973,6 @@ impl AsyncClient {
|
||||||
async fn share_group_session(&self, room_id: &RoomId) -> Result<()> {
|
async fn share_group_session(&self, room_id: &RoomId) -> Result<()> {
|
||||||
let mut requests = self
|
let mut requests = self
|
||||||
.base_client
|
.base_client
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.share_group_session(room_id)
|
.share_group_session(room_id)
|
||||||
.await
|
.await
|
||||||
.expect("Keys don't need to be uploaded");
|
.expect("Keys don't need to be uploaded");
|
||||||
|
@ -1030,8 +999,6 @@ impl AsyncClient {
|
||||||
async fn keys_upload(&self) -> Result<upload_keys::Response> {
|
async fn keys_upload(&self) -> Result<upload_keys::Response> {
|
||||||
let (device_keys, one_time_keys) = self
|
let (device_keys, one_time_keys) = self
|
||||||
.base_client
|
.base_client
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.keys_for_upload()
|
.keys_for_upload()
|
||||||
.await
|
.await
|
||||||
.expect("Keys don't need to be uploaded");
|
.expect("Keys don't need to be uploaded");
|
||||||
|
@ -1049,8 +1016,6 @@ impl AsyncClient {
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request).await?;
|
||||||
self.base_client
|
self.base_client
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.receive_keys_upload_response(&response)
|
.receive_keys_upload_response(&response)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(response)
|
Ok(response)
|
||||||
|
@ -1059,7 +1024,7 @@ impl AsyncClient {
|
||||||
/// Get the current, if any, sync token of the client.
|
/// Get the current, if any, sync token of the client.
|
||||||
/// This will be None if the client didn't sync at least once.
|
/// This will be None if the client didn't sync at least once.
|
||||||
pub async fn sync_token(&self) -> Option<String> {
|
pub async fn sync_token(&self) -> Option<String> {
|
||||||
self.base_client.read().await.sync_token().await
|
self.base_client.sync_token().await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Query the server for users device keys.
|
/// Query the server for users device keys.
|
||||||
|
@ -1073,8 +1038,6 @@ impl AsyncClient {
|
||||||
async fn keys_query(&self) -> Result<get_keys::Response> {
|
async fn keys_query(&self) -> Result<get_keys::Response> {
|
||||||
let mut users_for_query = self
|
let mut users_for_query = self
|
||||||
.base_client
|
.base_client
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.users_for_key_query()
|
.users_for_key_query()
|
||||||
.await
|
.await
|
||||||
.expect("Keys don't need to be uploaded");
|
.expect("Keys don't need to be uploaded");
|
||||||
|
@ -1098,8 +1061,6 @@ impl AsyncClient {
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request).await?;
|
||||||
self.base_client
|
self.base_client
|
||||||
.read()
|
|
||||||
.await
|
|
||||||
.receive_keys_query_response(&response)
|
.receive_keys_query_response(&response)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -63,6 +63,7 @@ pub type Token = String;
|
||||||
///
|
///
|
||||||
/// This Client is a state machine that receives responses and events and
|
/// This Client is a state machine that receives responses and events and
|
||||||
/// accordingly updates it's state.
|
/// accordingly updates it's state.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
/// The current client session containing our user id, device id and access
|
/// The current client session containing our user id, device id and access
|
||||||
/// token.
|
/// token.
|
||||||
|
@ -173,7 +174,7 @@ impl Client {
|
||||||
/// When a client is provided the state store will load state from the `StateStore`.
|
/// When a client is provided the state store will load state from the `StateStore`.
|
||||||
///
|
///
|
||||||
/// Returns `true` when a state store sync has successfully completed.
|
/// Returns `true` when a state store sync has successfully completed.
|
||||||
pub(crate) async fn sync_with_state_store(&mut self) -> Result<bool> {
|
pub(crate) async fn sync_with_state_store(&self) -> Result<bool> {
|
||||||
let store = self.state_store.read().await;
|
let store = self.state_store.read().await;
|
||||||
if let Some(store) = store.as_ref() {
|
if let Some(store) = store.as_ref() {
|
||||||
if let Some(sess) = self.session.read().await.as_ref() {
|
if let Some(sess) = self.session.read().await.as_ref() {
|
||||||
|
@ -193,12 +194,10 @@ impl Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut rooms = store.load_all_rooms().await?;
|
let mut rooms = store.load_all_rooms().await?;
|
||||||
self.joined_rooms = Arc::new(RwLock::new(
|
*self.joined_rooms.write().await = rooms
|
||||||
rooms
|
.drain()
|
||||||
.drain()
|
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
|
||||||
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
|
.collect();
|
||||||
.collect(),
|
|
||||||
));
|
|
||||||
|
|
||||||
self.needs_state_store_sync.store(false, Ordering::Relaxed);
|
self.needs_state_store_sync.store(false, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
@ -213,7 +212,7 @@ impl Client {
|
||||||
/// * `response` - A successful login response that contains our access token
|
/// * `response` - A successful login response that contains our access token
|
||||||
/// and device id.
|
/// and device id.
|
||||||
pub async fn receive_login_response(
|
pub async fn receive_login_response(
|
||||||
&mut self,
|
&self,
|
||||||
response: &api::session::login::Response,
|
response: &api::session::login::Response,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let session = Session {
|
let session = Session {
|
||||||
|
@ -939,7 +938,7 @@ mod test {
|
||||||
|
|
||||||
let _response = client.sync(sync_settings).await.unwrap();
|
let _response = client.sync(sync_settings).await.unwrap();
|
||||||
|
|
||||||
let bc = &client.base_client.read().await;
|
let bc = &client.base_client;
|
||||||
let ignored_users = bc.ignored_users.read().await;
|
let ignored_users = bc.ignored_users.read().await;
|
||||||
assert_eq!(1, ignored_users.len())
|
assert_eq!(1, ignored_users.len())
|
||||||
}
|
}
|
||||||
|
|
|
@ -493,7 +493,7 @@ mod test {
|
||||||
|
|
||||||
let _response = client.sync(sync_settings).await.unwrap();
|
let _response = client.sync(sync_settings).await.unwrap();
|
||||||
|
|
||||||
let rooms_lock = &client.base_client.read().await.joined_rooms;
|
let rooms_lock = &client.base_client.joined_rooms;
|
||||||
let rooms = rooms_lock.read().await;
|
let rooms = rooms_lock.read().await;
|
||||||
let room = &rooms
|
let room = &rooms
|
||||||
.get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap())
|
.get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap())
|
||||||
|
|
|
@ -287,7 +287,7 @@ mod test {
|
||||||
AsyncClient::new_with_config(homeserver, Some(session.clone()), config).unwrap();
|
AsyncClient::new_with_config(homeserver, Some(session.clone()), config).unwrap();
|
||||||
client.sync(sync_settings).await.unwrap();
|
client.sync(sync_settings).await.unwrap();
|
||||||
|
|
||||||
let base_client = client.base_client.read().await;
|
let base_client = &client.base_client;
|
||||||
|
|
||||||
// assert the synced client and the logged in client are equal
|
// assert the synced client and the logged in client are equal
|
||||||
assert_eq!(*base_client.session.read().await, Some(session));
|
assert_eq!(*base_client.session.read().await, Some(session));
|
||||||
|
|
|
@ -343,13 +343,11 @@ impl ClientTestRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stream_client_events(&mut self) {
|
async fn stream_client_events(&mut self) {
|
||||||
let cli = self
|
let cli = &self
|
||||||
.client
|
.client
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("`AsyncClient` must be set use `ClientTestRunner::set_client`")
|
.expect("`AsyncClient` must be set use `ClientTestRunner::set_client`")
|
||||||
.base_client
|
.base_client;
|
||||||
.write()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let room_id = &self.room_user_id.0;
|
let room_id = &self.room_user_id.0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue