base_client: Make the base client a bit more threadsafe.

This moves the bulk of the sync logic into the base client, to avoid
deadlocks while someone tires to send messages from a event callback the
base client needed to get a bunch of locks.

Ideally the AsyncClient would not need a lock for the base client at all
but we're not there yet.
master
Damir Jelić 2020-05-06 13:57:58 +02:00
parent 39e59792d2
commit 967544bab9
6 changed files with 203 additions and 213 deletions

View File

@ -17,7 +17,6 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::ops::Deref;
use std::result::Result as StdResult; use std::result::Result as StdResult;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -339,7 +338,7 @@ impl AsyncClient {
/// Returns the rooms this client knows about. /// Returns the rooms this client knows about.
/// ///
/// A `HashMap` of room id to `matrix::models::Room` /// A `HashMap` of room id to `matrix::models::Room`
pub async fn get_rooms(&self) -> HashMap<RoomId, Arc<tokio::sync::RwLock<Room>>> { pub async fn get_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<tokio::sync::RwLock<Room>>>>> {
self.base_client.read().await.joined_rooms.clone() self.base_client.read().await.joined_rooms.clone()
} }
@ -645,109 +644,8 @@ impl AsyncClient {
let mut response = self.send(request).await?; let mut response = self.send(request).await?;
let mut updated = false; let client = self.base_client.read().await;
for (room_id, room) in &mut response.rooms.join { client.receive_sync_response(&mut response).await?;
let matrix_room = {
let mut client = self.base_client.write().await;
for event in &room.state.events {
if let Ok(e) = event.deserialize() {
if client.receive_joined_state_event(&room_id, &e).await {
updated = true;
}
}
}
client.get_or_create_room(&room_id).clone()
};
// RoomSummary contains information for calculating room name
matrix_room.write().await.set_room_summary(&room.summary);
// re looping is not ideal here
for event in &mut room.state.events {
if let Ok(e) = event.deserialize() {
let client = self.base_client.read().await;
client.emit_state_event(&room_id, &e).await;
}
}
for mut event in &mut room.timeline.events {
let decrypted_event = {
let mut client = self.base_client.write().await;
let (decrypt_ev, timeline_update) = client
.receive_joined_timeline_event(room_id, &mut event)
.await;
if timeline_update {
updated = true;
};
decrypt_ev
};
if let Some(e) = decrypted_event {
*event = e;
}
if let Ok(e) = event.deserialize() {
let client = self.base_client.read().await;
client.emit_timeline_event(&room_id, &e).await;
}
}
// look at AccountData to further cut down users by collecting ignored users
if let Some(account_data) = &room.account_data {
for account_data in &account_data.events {
{
if let Ok(e) = account_data.deserialize() {
let mut client = self.base_client.write().await;
if client.receive_account_data_event(&room_id, &e).await {
updated = true;
}
client.emit_account_data_event(room_id, &e).await;
}
}
}
}
// After the room has been created and state/timeline events accounted for we use the room_id of the newly created
// room to add any presence events that relate to a user in the current room. This is not super
// efficient but we need a room_id so we would loop through now or later.
for presence in &mut response.presence.events {
{
if let Ok(e) = presence.deserialize() {
let mut client = self.base_client.write().await;
if client.receive_presence_event(&room_id, &e).await {
updated = true;
}
client.emit_presence_event(&room_id, &e).await;
}
}
}
for ephemeral in &mut room.ephemeral.events {
{
if let Ok(e) = ephemeral.deserialize() {
let mut client = self.base_client.write().await;
if client.receive_ephemeral_event(&room_id, &e).await {
updated = true;
}
client.emit_ephemeral_event(&room_id, &e).await;
}
}
}
if updated {
if let Some(store) = self.base_client.read().await.state_store.as_ref() {
store
.store_room_state(matrix_room.read().await.deref())
.await?;
}
}
}
let mut client = self.base_client.write().await;
client.receive_sync_response(&mut response, updated).await?;
Ok(response) Ok(response)
} }
@ -994,7 +892,7 @@ impl AsyncClient {
{ {
let encrypted = { let encrypted = {
let client = self.base_client.read().await; let client = self.base_client.read().await;
let room = client.joined_rooms.get(room_id); let room = client.get_room(room_id).await;
match room { match room {
Some(r) => r.read().await.is_encrypted(), Some(r) => r.read().await.is_encrypted(),
@ -1005,7 +903,7 @@ impl AsyncClient {
if encrypted { if encrypted {
let missing_sessions = { let missing_sessions = {
let client = self.base_client.read().await; let client = self.base_client.read().await;
let room = client.joined_rooms.get(room_id); let room = client.get_room(room_id).await;
let room = room.as_ref().unwrap().read().await; let room = room.as_ref().unwrap().read().await;
let users = room.members.keys(); let users = room.members.keys();
self.base_client self.base_client
@ -1158,7 +1056,7 @@ impl AsyncClient {
/// Get the current, if any, sync token of the client. /// Get the current, if any, sync token of the client.
/// This will be None if the client didn't sync at least once. /// This will be None if the client didn't sync at least once.
pub async fn sync_token(&self) -> Option<String> { pub async fn sync_token(&self) -> Option<String> {
self.base_client.read().await.sync_token.clone() self.base_client.read().await.sync_token().await
} }
/// Query the server for users device keys. /// Query the server for users device keys.

View File

@ -17,6 +17,7 @@ use std::collections::HashMap;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::fmt; use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
@ -37,6 +38,7 @@ use crate::session::Session;
use crate::state::{ClientState, StateStore}; use crate::state::{ClientState, StateStore};
use crate::EventEmitter; use crate::EventEmitter;
use std::ops::Deref;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -66,13 +68,13 @@ pub struct Client {
/// token. /// token.
pub session: Option<Session>, pub session: Option<Session>,
/// The current sync token that should be used for the next sync call. /// The current sync token that should be used for the next sync call.
pub sync_token: Option<Token>, pub sync_token: Arc<RwLock<Option<Token>>>,
/// A map of the rooms our user is joined in. /// A map of the rooms our user is joined in.
pub joined_rooms: HashMap<RoomId, Arc<RwLock<Room>>>, pub joined_rooms: Arc<RwLock<HashMap<RoomId, Arc<RwLock<Room>>>>>,
/// A list of ignored users. /// A list of ignored users.
pub ignored_users: Vec<UserId>, pub ignored_users: Arc<RwLock<Vec<UserId>>>,
/// The push ruleset for the logged in user. /// The push ruleset for the logged in user.
pub push_ruleset: Option<Ruleset>, pub push_ruleset: Arc<RwLock<Option<Ruleset>>>,
/// Any implementor of EventEmitter will act as the callbacks for various /// Any implementor of EventEmitter will act as the callbacks for various
/// events. /// events.
pub event_emitter: Option<Box<dyn EventEmitter>>, pub event_emitter: Option<Box<dyn EventEmitter>>,
@ -82,7 +84,7 @@ pub struct Client {
/// There is a default implementation `JsonStore` that saves JSON to disk. /// There is a default implementation `JsonStore` that saves JSON to disk.
pub state_store: Option<Box<dyn StateStore>>, pub state_store: Option<Box<dyn StateStore>>,
/// Does the `Client` need to sync with the state store. /// Does the `Client` need to sync with the state store.
needs_state_store_sync: bool, needs_state_store_sync: Arc<AtomicBool>,
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
olm: Arc<Mutex<Option<OlmMachine>>>, olm: Arc<Mutex<Option<OlmMachine>>>,
@ -117,13 +119,13 @@ impl Client {
Ok(Client { Ok(Client {
session, session,
sync_token: None, sync_token: Arc::new(RwLock::new(None)),
joined_rooms: HashMap::new(), joined_rooms: Arc::new(RwLock::new(HashMap::new())),
ignored_users: Vec::new(), ignored_users: Arc::new(RwLock::new(Vec::new())),
push_ruleset: None, push_ruleset: Arc::new(RwLock::new(None)),
event_emitter: None, event_emitter: None,
state_store: None, state_store: None,
needs_state_store_sync: true, needs_state_store_sync: Arc::new(AtomicBool::from(true)),
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
olm: Arc::new(Mutex::new(olm)), olm: Arc::new(Mutex::new(olm)),
}) })
@ -143,7 +145,7 @@ impl Client {
/// Returns true if the state store has been loaded into the client. /// Returns true if the state store has been loaded into the client.
pub fn is_state_store_synced(&self) -> bool { pub fn is_state_store_synced(&self) -> bool {
!self.needs_state_store_sync !self.needs_state_store_sync.load(Ordering::Relaxed)
} }
/// When a client is provided the state store will load state from the `StateStore`. /// When a client is provided the state store will load state from the `StateStore`.
@ -158,9 +160,9 @@ impl Client {
ignored_users, ignored_users,
push_ruleset, push_ruleset,
} = client_state; } = client_state;
self.sync_token = sync_token; *self.sync_token.write().await = sync_token;
self.ignored_users = ignored_users; *self.ignored_users.write().await = ignored_users;
self.push_ruleset = push_ruleset; *self.push_ruleset.write().await = push_ruleset;
} else { } else {
// return false and continues with a sync request then save the state and create // return false and continues with a sync request then save the state and create
// and populate the files during the sync // and populate the files during the sync
@ -168,15 +170,17 @@ impl Client {
} }
let mut rooms = store.load_all_rooms().await?; let mut rooms = store.load_all_rooms().await?;
self.joined_rooms = rooms self.joined_rooms = Arc::new(RwLock::new(
.drain() rooms
.map(|(k, room)| (k, Arc::new(RwLock::new(room)))) .drain()
.collect(); .map(|(k, room)| (k, Arc::new(RwLock::new(room))))
.collect(),
));
self.needs_state_store_sync = false; self.needs_state_store_sync.store(false, Ordering::Relaxed);
} }
} }
Ok(!self.needs_state_store_sync) Ok(!self.needs_state_store_sync.load(Ordering::Relaxed))
} }
/// Receive a login response and update the session of the client. /// Receive a login response and update the session of the client.
@ -206,7 +210,7 @@ impl Client {
} }
pub(crate) async fn calculate_room_name(&self, room_id: &RoomId) -> Option<String> { pub(crate) async fn calculate_room_name(&self, room_id: &RoomId) -> Option<String> {
if let Some(room) = self.joined_rooms.get(room_id) { if let Some(room) = self.joined_rooms.read().await.get(room_id) {
let room = room.read().await; let room = room.read().await;
Some(room.room_name.calculate_name(&room.members)) Some(room.room_name.calculate_name(&room.members))
} else { } else {
@ -216,16 +220,17 @@ impl Client {
pub(crate) async fn calculate_room_names(&self) -> Vec<String> { pub(crate) async fn calculate_room_names(&self) -> Vec<String> {
let mut res = Vec::new(); let mut res = Vec::new();
for room in self.joined_rooms.values() { for room in self.joined_rooms.read().await.values() {
let room = room.read().await; let room = room.read().await;
res.push(room.room_name.calculate_name(&room.members)) res.push(room.room_name.calculate_name(&room.members))
} }
res res
} }
pub(crate) fn get_or_create_room(&mut self, room_id: &RoomId) -> &mut Arc<RwLock<Room>> { pub(crate) async fn get_or_create_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> {
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
self.joined_rooms let mut rooms = self.joined_rooms.write().await;
rooms
.entry(room_id.clone()) .entry(room_id.clone())
.or_insert(Arc::new(RwLock::new(Room::new( .or_insert(Arc::new(RwLock::new(Room::new(
room_id, room_id,
@ -235,23 +240,24 @@ impl Client {
.expect("Receiving events while not being logged in") .expect("Receiving events while not being logged in")
.user_id, .user_id,
)))) ))))
.clone()
} }
pub(crate) fn get_room(&self, room_id: &RoomId) -> Option<&Arc<RwLock<Room>>> { pub(crate) async fn get_room(&self, room_id: &RoomId) -> Option<Arc<RwLock<Room>>> {
self.joined_rooms.get(room_id) self.joined_rooms.read().await.get(room_id).cloned()
} }
/// Handle a m.ignored_user_list event, updating the room state if necessary. /// Handle a m.ignored_user_list event, updating the room state if necessary.
/// ///
/// Returns true if the room name changed, false otherwise. /// Returns true if the room name changed, false otherwise.
pub(crate) fn handle_ignored_users(&mut self, event: &IgnoredUserListEvent) -> bool { pub(crate) async fn handle_ignored_users(&self, event: &IgnoredUserListEvent) -> bool {
// this avoids cloning every UserId for the eq check // this avoids cloning every UserId for the eq check
if self.ignored_users.iter().collect::<Vec<_>>() if self.ignored_users.read().await.iter().collect::<Vec<_>>()
== event.content.ignored_users.iter().collect::<Vec<_>>() == event.content.ignored_users.iter().collect::<Vec<_>>()
{ {
false false
} else { } else {
self.ignored_users = event.content.ignored_users.to_vec(); *self.ignored_users.write().await = event.content.ignored_users.to_vec();
true true
} }
} }
@ -259,7 +265,7 @@ impl Client {
/// Handle a m.ignored_user_list event, updating the room state if necessary. /// Handle a m.ignored_user_list event, updating the room state if necessary.
/// ///
/// Returns true if the room name changed, false otherwise. /// Returns true if the room name changed, false otherwise.
pub(crate) fn handle_push_rules(&mut self, event: &PushRulesEvent) -> bool { pub(crate) async fn handle_push_rules(&self, event: &PushRulesEvent) -> bool {
// TODO this is basically a stub // TODO this is basically a stub
// TODO ruma removed PartialEq for evens, so this doesn't work anymore. // TODO ruma removed PartialEq for evens, so this doesn't work anymore.
// Returning always true for now should be ok here since those don't // Returning always true for now should be ok here since those don't
@ -267,7 +273,7 @@ impl Client {
// if self.push_ruleset.as_ref() == Some(&event.content.global) { // if self.push_ruleset.as_ref() == Some(&event.content.global) {
// false // false
// } else { // } else {
self.push_ruleset = Some(event.content.global.clone()); *self.push_ruleset.write().await = Some(event.content.global.clone());
true true
// } // }
} }
@ -283,7 +289,7 @@ impl Client {
/// ///
/// * `event` - The event that should be handled by the client. /// * `event` - The event that should be handled by the client.
pub async fn receive_joined_timeline_event( pub async fn receive_joined_timeline_event(
&mut self, &self,
room_id: &RoomId, room_id: &RoomId,
event: &mut EventJson<RoomEvent>, event: &mut EventJson<RoomEvent>,
) -> (Option<EventJson<RoomEvent>>, bool) { ) -> (Option<EventJson<RoomEvent>>, bool) {
@ -307,7 +313,8 @@ impl Client {
} }
} }
let mut room = self.get_or_create_room(&room_id).write().await; let room_lock = self.get_or_create_room(&room_id).await;
let mut room = room_lock.write().await;
(decrypted_event, room.receive_timeline_event(&e)) (decrypted_event, room.receive_timeline_event(&e))
} }
_ => (None, false), _ => (None, false),
@ -324,12 +331,9 @@ impl Client {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The event that should be handled by the client. /// * `event` - The event that should be handled by the client.
pub async fn receive_joined_state_event( pub async fn receive_joined_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool {
&mut self, let room_lock = self.get_or_create_room(room_id).await;
room_id: &RoomId, let mut room = room_lock.write().await;
event: &StateEvent,
) -> bool {
let mut room = self.get_or_create_room(room_id).write().await;
room.receive_state_event(event) room.receive_state_event(event)
} }
@ -343,13 +347,9 @@ impl Client {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The event that should be handled by the client. /// * `event` - The event that should be handled by the client.
pub async fn receive_presence_event( pub async fn receive_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) -> bool {
&mut self,
room_id: &RoomId,
event: &PresenceEvent,
) -> bool {
// this should be the room that was just created in the `Client::sync` loop. // this should be the room that was just created in the `Client::sync` loop.
if let Some(room) = self.get_room(room_id) { if let Some(room) = self.get_room(room_id).await {
let mut room = room.write().await; let mut room = room.write().await;
room.receive_presence_event(event) room.receive_presence_event(event)
} else { } else {
@ -366,15 +366,11 @@ impl Client {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The presence event for a specified room member. /// * `event` - The presence event for a specified room member.
pub async fn receive_account_data_event( pub async fn receive_account_data_event(&self, room_id: &RoomId, event: &NonRoomEvent) -> bool {
&mut self,
room_id: &RoomId,
event: &NonRoomEvent,
) -> bool {
match event { match event {
NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu), NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu).await,
NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await, NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await,
NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr), NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr).await,
_ => false, _ => false,
} }
} }
@ -388,19 +384,21 @@ impl Client {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The presence event for a specified room member. /// * `event` - The presence event for a specified room member.
pub async fn receive_ephemeral_event( pub async fn receive_ephemeral_event(&self, room_id: &RoomId, event: &NonRoomEvent) -> bool {
&mut self,
room_id: &RoomId,
event: &NonRoomEvent,
) -> bool {
match event { match event {
NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu), NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu).await,
NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await, NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await,
NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr), NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr).await,
_ => false, _ => false,
} }
} }
/// Get the current, if any, sync token of the client.
/// This will be None if the client didn't sync at least once.
pub async fn sync_token(&self) -> Option<String> {
self.sync_token.read().await.clone()
}
/// Receive a response from a sync call. /// Receive a response from a sync call.
/// ///
/// # Arguments /// # Arguments
@ -409,11 +407,10 @@ impl Client {
/// ///
/// * `did_update` - Signals to the `StateStore` if the client state needs updating. /// * `did_update` - Signals to the `StateStore` if the client state needs updating.
pub async fn receive_sync_response( pub async fn receive_sync_response(
&mut self, &self,
response: &mut api::sync::sync_events::Response, response: &mut api::sync::sync_events::Response,
did_update: bool,
) -> Result<()> { ) -> Result<()> {
self.sync_token = Some(response.next_batch.clone()); *self.sync_token.write().await = Some(response.next_batch.clone());
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
{ {
@ -425,8 +422,8 @@ impl Client {
// TODO once the base client deals with callbacks move this into the // TODO once the base client deals with callbacks move this into the
// part where we already iterate through the rooms to avoid yet // part where we already iterate through the rooms to avoid yet
// another room loop. // another room loop.
for room in self.joined_rooms.values() { for room in self.joined_rooms.read().await.values() {
let room = room.write().await; let room = room.read().await;
if !room.is_encrypted() { if !room.is_encrypted() {
continue; continue;
} }
@ -436,9 +433,103 @@ impl Client {
} }
} }
if did_update { let mut updated = false;
for (room_id, room) in &mut response.rooms.join {
let matrix_room = {
for event in &room.state.events {
if let Ok(e) = event.deserialize() {
if self.receive_joined_state_event(&room_id, &e).await {
updated = true;
}
}
}
self.get_or_create_room(&room_id).await.clone()
};
// RoomSummary contains information for calculating room name
matrix_room.write().await.set_room_summary(&room.summary);
// re looping is not ideal here
for event in &mut room.state.events {
if let Ok(e) = event.deserialize() {
self.emit_state_event(&room_id, &e).await;
}
}
for mut event in &mut room.timeline.events {
let decrypted_event = {
let (decrypt_ev, timeline_update) = self
.receive_joined_timeline_event(room_id, &mut event)
.await;
if timeline_update {
updated = true;
};
decrypt_ev
};
if let Some(e) = decrypted_event {
*event = e;
}
if let Ok(e) = event.deserialize() {
self.emit_timeline_event(&room_id, &e).await;
}
}
// look at AccountData to further cut down users by collecting ignored users
if let Some(account_data) = &room.account_data {
for account_data in &account_data.events {
{
if let Ok(e) = account_data.deserialize() {
if self.receive_account_data_event(&room_id, &e).await {
updated = true;
}
self.emit_account_data_event(room_id, &e).await;
}
}
}
}
// After the room has been created and state/timeline events accounted for we use the room_id of the newly created
// room to add any presence events that relate to a user in the current room. This is not super
// efficient but we need a room_id so we would loop through now or later.
for presence in &mut response.presence.events {
{
if let Ok(e) = presence.deserialize() {
if self.receive_presence_event(&room_id, &e).await {
updated = true;
}
self.emit_presence_event(&room_id, &e).await;
}
}
}
for ephemeral in &mut room.ephemeral.events {
{
if let Ok(e) = ephemeral.deserialize() {
if self.receive_ephemeral_event(&room_id, &e).await {
updated = true;
}
self.emit_ephemeral_event(&room_id, &e).await;
}
}
}
if updated {
if let Some(store) = self.state_store.as_ref() {
store
.store_room_state(matrix_room.read().await.deref())
.await?;
}
}
}
if updated {
if let Some(store) = self.state_store.as_ref() { if let Some(store) = self.state_store.as_ref() {
let state = ClientState::from_base_client(&self); let state = ClientState::from_base_client(&self).await;
store.store_client_state(state).await?; store.store_client_state(state).await?;
} }
} }
@ -512,7 +603,7 @@ impl Client {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
) -> Result<Vec<send_event_to_device::Request>> { ) -> Result<Vec<send_event_to_device::Request>> {
let room = self.get_room(room_id).expect("No room found"); let room = self.get_room(room_id).await.expect("No room found");
let mut olm = self.olm.lock().await; let mut olm = self.olm.lock().await;
match &mut *olm { match &mut *olm {
@ -633,21 +724,21 @@ impl Client {
match event { match event {
RoomEvent::RoomMember(mem) => { RoomEvent::RoomMember(mem) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_member(Arc::clone(&room), &mem).await; ee.on_room_member(Arc::clone(&room), &mem).await;
} }
} }
} }
RoomEvent::RoomName(name) => { RoomEvent::RoomName(name) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_name(Arc::clone(&room), &name).await; ee.on_room_name(Arc::clone(&room), &name).await;
} }
} }
} }
RoomEvent::RoomCanonicalAlias(canonical) => { RoomEvent::RoomCanonicalAlias(canonical) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_canonical_alias(Arc::clone(&room), &canonical) ee.on_room_canonical_alias(Arc::clone(&room), &canonical)
.await; .await;
} }
@ -655,28 +746,28 @@ impl Client {
} }
RoomEvent::RoomAliases(aliases) => { RoomEvent::RoomAliases(aliases) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_aliases(Arc::clone(&room), &aliases).await; ee.on_room_aliases(Arc::clone(&room), &aliases).await;
} }
} }
} }
RoomEvent::RoomAvatar(avatar) => { RoomEvent::RoomAvatar(avatar) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_avatar(Arc::clone(&room), &avatar).await; ee.on_room_avatar(Arc::clone(&room), &avatar).await;
} }
} }
} }
RoomEvent::RoomMessage(msg) => { RoomEvent::RoomMessage(msg) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_message(Arc::clone(&room), &msg).await; ee.on_room_message(Arc::clone(&room), &msg).await;
} }
} }
} }
RoomEvent::RoomMessageFeedback(msg_feedback) => { RoomEvent::RoomMessageFeedback(msg_feedback) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_message_feedback(Arc::clone(&room), &msg_feedback) ee.on_room_message_feedback(Arc::clone(&room), &msg_feedback)
.await; .await;
} }
@ -684,21 +775,21 @@ impl Client {
} }
RoomEvent::RoomRedaction(redaction) => { RoomEvent::RoomRedaction(redaction) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_redaction(Arc::clone(&room), &redaction).await; ee.on_room_redaction(Arc::clone(&room), &redaction).await;
} }
} }
} }
RoomEvent::RoomPowerLevels(power) => { RoomEvent::RoomPowerLevels(power) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_power_levels(Arc::clone(&room), &power).await; ee.on_room_power_levels(Arc::clone(&room), &power).await;
} }
} }
} }
RoomEvent::RoomTombstone(tomb) => { RoomEvent::RoomTombstone(tomb) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_tombstone(Arc::clone(&room), &tomb).await; ee.on_room_tombstone(Arc::clone(&room), &tomb).await;
} }
} }
@ -711,21 +802,21 @@ impl Client {
match event { match event {
StateEvent::RoomMember(member) => { StateEvent::RoomMember(member) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_state_member(Arc::clone(&room), &member).await; ee.on_state_member(Arc::clone(&room), &member).await;
} }
} }
} }
StateEvent::RoomName(name) => { StateEvent::RoomName(name) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_state_name(Arc::clone(&room), &name).await; ee.on_state_name(Arc::clone(&room), &name).await;
} }
} }
} }
StateEvent::RoomCanonicalAlias(canonical) => { StateEvent::RoomCanonicalAlias(canonical) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_state_canonical_alias(Arc::clone(&room), &canonical) ee.on_state_canonical_alias(Arc::clone(&room), &canonical)
.await; .await;
} }
@ -733,35 +824,35 @@ impl Client {
} }
StateEvent::RoomAliases(aliases) => { StateEvent::RoomAliases(aliases) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_state_aliases(Arc::clone(&room), &aliases).await; ee.on_state_aliases(Arc::clone(&room), &aliases).await;
} }
} }
} }
StateEvent::RoomAvatar(avatar) => { StateEvent::RoomAvatar(avatar) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_state_avatar(Arc::clone(&room), &avatar).await; ee.on_state_avatar(Arc::clone(&room), &avatar).await;
} }
} }
} }
StateEvent::RoomPowerLevels(power) => { StateEvent::RoomPowerLevels(power) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_state_power_levels(Arc::clone(&room), &power).await; ee.on_state_power_levels(Arc::clone(&room), &power).await;
} }
} }
} }
StateEvent::RoomJoinRules(rules) => { StateEvent::RoomJoinRules(rules) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_state_join_rules(Arc::clone(&room), &rules).await; ee.on_state_join_rules(Arc::clone(&room), &rules).await;
} }
} }
} }
StateEvent::RoomTombstone(tomb) => { StateEvent::RoomTombstone(tomb) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_room_tombstone(Arc::clone(&room), &tomb).await; ee.on_room_tombstone(Arc::clone(&room), &tomb).await;
} }
} }
@ -774,14 +865,14 @@ impl Client {
match event { match event {
NonRoomEvent::Presence(presence) => { NonRoomEvent::Presence(presence) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_presence(Arc::clone(&room), &presence).await; ee.on_account_presence(Arc::clone(&room), &presence).await;
} }
} }
} }
NonRoomEvent::IgnoredUserList(ignored) => { NonRoomEvent::IgnoredUserList(ignored) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_ignored_users(Arc::clone(&room), &ignored) ee.on_account_ignored_users(Arc::clone(&room), &ignored)
.await; .await;
} }
@ -789,14 +880,14 @@ impl Client {
} }
NonRoomEvent::PushRules(rules) => { NonRoomEvent::PushRules(rules) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_push_rules(Arc::clone(&room), &rules).await; ee.on_account_push_rules(Arc::clone(&room), &rules).await;
} }
} }
} }
NonRoomEvent::FullyRead(full_read) => { NonRoomEvent::FullyRead(full_read) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_data_fully_read(Arc::clone(&room), &full_read) ee.on_account_data_fully_read(Arc::clone(&room), &full_read)
.await; .await;
} }
@ -810,14 +901,14 @@ impl Client {
match event { match event {
NonRoomEvent::Presence(presence) => { NonRoomEvent::Presence(presence) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_presence(Arc::clone(&room), &presence).await; ee.on_account_presence(Arc::clone(&room), &presence).await;
} }
} }
} }
NonRoomEvent::IgnoredUserList(ignored) => { NonRoomEvent::IgnoredUserList(ignored) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_ignored_users(Arc::clone(&room), &ignored) ee.on_account_ignored_users(Arc::clone(&room), &ignored)
.await; .await;
} }
@ -825,14 +916,14 @@ impl Client {
} }
NonRoomEvent::PushRules(rules) => { NonRoomEvent::PushRules(rules) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_push_rules(Arc::clone(&room), &rules).await; ee.on_account_push_rules(Arc::clone(&room), &rules).await;
} }
} }
} }
NonRoomEvent::FullyRead(full_read) => { NonRoomEvent::FullyRead(full_read) => {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_account_data_fully_read(Arc::clone(&room), &full_read) ee.on_account_data_fully_read(Arc::clone(&room), &full_read)
.await; .await;
} }
@ -844,7 +935,7 @@ impl Client {
pub(crate) async fn emit_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) { pub(crate) async fn emit_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) {
if let Some(ee) = &self.event_emitter { if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) { if let Some(room) = self.get_room(&room_id).await {
ee.on_presence_event(Arc::clone(&room), &event).await; ee.on_presence_event(Arc::clone(&room), &event).await;
} }
} }
@ -889,6 +980,7 @@ mod test {
let _response = client.sync(sync_settings).await.unwrap(); let _response = client.sync(sync_settings).await.unwrap();
let bc = &client.base_client.read().await; let bc = &client.base_client.read().await;
assert_eq!(1, bc.ignored_users.len()) let ignored_users = bc.ignored_users.read().await;
assert_eq!(1, ignored_users.len())
} }
} }

View File

@ -493,7 +493,8 @@ mod test {
let _response = client.sync(sync_settings).await.unwrap(); let _response = client.sync(sync_settings).await.unwrap();
let rooms = &client.base_client.read().await.joined_rooms; let rooms_lock = &client.base_client.read().await.joined_rooms;
let rooms = rooms_lock.read().await;
let room = &rooms let room = &rooms
.get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap()) .get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap())
.unwrap() .unwrap()

View File

@ -13,13 +13,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
pub mod state_store; pub mod state_store;
pub use state_store::JsonStore; pub use state_store::JsonStore;
use serde::{Deserialize, Serialize};
use crate::base_client::{Client as BaseClient, Token}; use crate::base_client::{Client as BaseClient, Token};
use crate::events::push_rules::Ruleset; use crate::events::push_rules::Ruleset;
use crate::identifiers::{RoomId, UserId}; use crate::identifiers::{RoomId, UserId};
@ -48,7 +47,7 @@ impl PartialEq for ClientState {
} }
impl ClientState { impl ClientState {
pub fn from_base_client(client: &BaseClient) -> ClientState { pub async fn from_base_client(client: &BaseClient) -> ClientState {
let BaseClient { let BaseClient {
sync_token, sync_token,
ignored_users, ignored_users,
@ -56,9 +55,9 @@ impl ClientState {
.. ..
} = client; } = client;
Self { Self {
sync_token: sync_token.clone(), sync_token: sync_token.read().await.clone(),
ignored_users: ignored_users.clone(), ignored_users: ignored_users.read().await.clone(),
push_ruleset: push_ruleset.clone(), push_ruleset: push_ruleset.read().await.clone(),
} }
} }
} }

View File

@ -292,11 +292,11 @@ mod test {
// assert the synced client and the logged in client are equal // assert the synced client and the logged in client are equal
assert_eq!(base_client.session, Some(session)); assert_eq!(base_client.session, Some(session));
assert_eq!( assert_eq!(
base_client.sync_token, base_client.sync_token().await,
Some("s526_47314_0_7_1_1_1_11444_1".to_string()) Some("s526_47314_0_7_1_1_1_11444_1".to_string())
); );
assert_eq!( assert_eq!(
base_client.ignored_users, *base_client.ignored_users.read().await,
vec![UserId::try_from("@someone:example.org").unwrap()] vec![UserId::try_from("@someone:example.org").unwrap()]
); );
} }

View File

@ -343,7 +343,7 @@ impl ClientTestRunner {
} }
async fn stream_client_events(&mut self) { async fn stream_client_events(&mut self) {
let mut cli = self let cli = self
.client .client
.as_ref() .as_ref()
.expect("`AsyncClient` must be set use `ClientTestRunner::set_client`") .expect("`AsyncClient` must be set use `ClientTestRunner::set_client`")