fix example, use tokio::Mutex for all types that could be passed

in async thread spawn envs
master
Devin R 2020-03-31 21:08:25 -04:00
parent 5612825762
commit f2b50677b9
6 changed files with 71 additions and 63 deletions

View File

@ -1,5 +1,6 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, RwLock};
use std::{env, process::exit}; use std::{env, process::exit};
use std::ops::{Deref, DerefMut};
use url::Url; use url::Url;
use matrix_sdk::{ use matrix_sdk::{
@ -10,19 +11,21 @@ use matrix_sdk::{
}, },
AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings,
}; };
use tokio::sync::Mutex;
struct EventCallback; struct EventCallback;
#[async_trait::async_trait] #[async_trait::async_trait]
impl EventEmitter for EventCallback { impl EventEmitter for EventCallback {
async fn on_room_message(&mut self, room: &Room, event: &RoomEvent) { async fn on_room_message(&mut self, room: Arc<Mutex<Room>>, event: Arc<Mutex<RoomEvent>>) {
if let RoomEvent::RoomMessage(MessageEvent { if let RoomEvent::RoomMessage(MessageEvent {
content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }),
sender, sender,
.. ..
}) = event }) = event.lock().await.deref()
{ {
let member = room.members.get(&sender.to_string()).unwrap(); let rooms = room.lock().await;
let member = rooms.members.get(&sender.to_string()).unwrap();
println!( println!(
"{}: {}", "{}: {}",
member member
@ -48,7 +51,7 @@ async fn login(
let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap();
client client
.add_event_emitter(Arc::new(Mutex::new(EventCallback))) .add_event_emitter(Arc::new(Mutex::new(Box::new(EventCallback))))
.await; .await;
client client

View File

@ -41,6 +41,7 @@ use ruma_identifiers::{DeviceId, UserId};
use crate::api; use crate::api;
use crate::base_client::Client as BaseClient; use crate::base_client::Client as BaseClient;
use crate::models::Room;
use crate::session::Session; use crate::session::Session;
use crate::VERSION; use crate::VERSION;
use crate::{Error, EventEmitter, Result}; use crate::{Error, EventEmitter, Result};
@ -268,12 +269,17 @@ impl AsyncClient {
/// Calculates the room name from a `RoomId`, returning a string. /// Calculates the room name from a `RoomId`, returning a string.
pub async fn get_room_name(&self, room_id: &str) -> Option<String> { pub async fn get_room_name(&self, room_id: &str) -> Option<String> {
self.base_client.read().await.calculate_room_name(room_id) self.base_client.read().await.calculate_room_name(room_id).await
} }
/// Calculates the room names this client knows about. /// Calculates the room names this client knows about.
pub async fn get_room_names(&self) -> Vec<String> { pub async fn get_room_names(&self) -> Vec<String> {
self.base_client.read().await.calculate_room_names() self.base_client.read().await.calculate_room_names().await
}
/// Calculates the room names this client knows about.
pub async fn get_rooms(&self) -> HashMap<String, Arc<tokio::sync::Mutex<Room>>> {
self.base_client.read().await.joined_rooms.clone()
} }
/// Calculates the room that the client last interacted with. /// Calculates the room that the client last interacted with.
@ -347,7 +353,7 @@ impl AsyncClient {
let _matrix_room = { let _matrix_room = {
for event in &room.state.events { for event in &room.state.events {
if let EventResult::Ok(e) = event { if let EventResult::Ok(e) = event {
client.receive_joined_state_event(&room_id_string, &e); client.receive_joined_state_event(&room_id_string, &e).await;
} }
} }
@ -383,7 +389,7 @@ impl AsyncClient {
for account_data in &mut room.account_data.events { for account_data in &mut room.account_data.events {
{ {
if let EventResult::Ok(e) = account_data { if let EventResult::Ok(e) = account_data {
client.receive_account_data(&room_id_string, e); client.receive_account_data(&room_id_string, e).await;
// TODO should we determine if anything room state has changed before calling // TODO should we determine if anything room state has changed before calling
client.emit_account_data_event(room_id, e).await; client.emit_account_data_event(room_id, e).await;
@ -399,7 +405,7 @@ impl AsyncClient {
for presence in &mut response.presence.events { for presence in &mut response.presence.events {
{ {
if let EventResult::Ok(e) = presence { if let EventResult::Ok(e) = presence {
client.receive_presence_event(&room_id_string, e); client.receive_presence_event(&room_id_string, e).await;
// TODO should we determine if any room state has changed before calling // TODO should we determine if any room state has changed before calling
client.emit_presence_event(room_id, e).await; client.emit_presence_event(room_id, e).await;

View File

@ -99,7 +99,7 @@ pub struct Client {
/// The current sync token that should be used for the next sync call. /// The current sync token that should be used for the next sync call.
pub sync_token: Option<Token>, pub sync_token: Option<Token>,
/// A map of the rooms our user is joined in. /// A map of the rooms our user is joined in.
pub joined_rooms: HashMap<String, Arc<RwLock<Room>>>, pub joined_rooms: HashMap<String, Arc<Mutex<Room>>>,
/// The most recent room the logged in user used by `RoomId`. /// The most recent room the logged in user used by `RoomId`.
pub current_room_id: CurrentRoom, pub current_room_id: CurrentRoom,
/// A list of ignored users. /// A list of ignored users.
@ -186,34 +186,33 @@ impl Client {
Ok(()) Ok(())
} }
pub(crate) fn calculate_room_name(&self, room_id: &str) -> Option<String> { pub(crate) async fn calculate_room_name(&self, room_id: &str) -> Option<String> {
self.joined_rooms.get(room_id).and_then(|r| { if let Some(room) = self.joined_rooms.get(room_id) {
r.read() let room = room.lock().await;
.map(|r| r.room_name.calculate_name(room_id, &r.members)) Some(room.room_name.calculate_name(room_id, &room.members))
.ok() } else {
}) None
}
} }
pub(crate) fn calculate_room_names(&self) -> Vec<String> { pub(crate) async fn calculate_room_names(&self) -> Vec<String> {
self.joined_rooms let mut res = Vec::new();
.iter() for (id, room) in &self.joined_rooms {
.flat_map(|(id, room)| { let room = room.lock().await;
room.read() res.push(room.room_name.calculate_name(id, &room.members))
.map(|r| r.room_name.calculate_name(id, &r.members)) }
.ok() res
})
.collect()
} }
pub(crate) fn current_room_id(&self) -> Option<RoomId> { pub(crate) fn current_room_id(&self) -> Option<RoomId> {
self.current_room_id.current_room_id.clone() self.current_room_id.current_room_id.clone()
} }
pub(crate) fn get_or_create_room(&mut self, room_id: &str) -> &mut Arc<RwLock<Room>> { pub(crate) fn get_or_create_room(&mut self, room_id: &str) -> &mut Arc<Mutex<Room>> {
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
self.joined_rooms self.joined_rooms
.entry(room_id.to_string()) .entry(room_id.to_string())
.or_insert(Arc::new(RwLock::new(Room::new( .or_insert(Arc::new(Mutex::new(Room::new(
room_id, room_id,
&self &self
.session .session
@ -224,7 +223,7 @@ impl Client {
)))) ))))
} }
pub(crate) fn get_room(&self, room_id: &str) -> Option<&Arc<RwLock<Room>>> { pub(crate) fn get_room(&self, room_id: &str) -> Option<&Arc<Mutex<Room>>> {
self.joined_rooms.get(room_id) self.joined_rooms.get(room_id)
} }
@ -305,8 +304,8 @@ impl Client {
let mut room = self let mut room = self
.get_or_create_room(&room_id.to_string()) .get_or_create_room(&room_id.to_string())
.write() .lock()
.unwrap(); .await;
room.receive_timeline_event(e); room.receive_timeline_event(e);
decrypted_event decrypted_event
} }
@ -324,8 +323,8 @@ impl Client {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The event that should be handled by the client. /// * `event` - The event that should be handled by the client.
pub fn receive_joined_state_event(&mut self, room_id: &str, event: &StateEvent) -> bool { pub async fn receive_joined_state_event(&mut self, room_id: &str, event: &StateEvent) -> bool {
let mut room = self.get_or_create_room(room_id).write().unwrap(); let mut room = self.get_or_create_room(room_id).lock().await;
room.receive_state_event(event) room.receive_state_event(event)
} }
@ -339,7 +338,7 @@ impl Client {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The event that should be handled by the client. /// * `event` - The event that should be handled by the client.
pub fn receive_presence_event(&mut self, room_id: &str, event: &PresenceEvent) -> bool { pub async fn receive_presence_event(&mut self, room_id: &str, event: &PresenceEvent) -> bool {
let user_id = &self let user_id = &self
.session .session
.as_ref() .as_ref()
@ -351,7 +350,7 @@ impl Client {
} }
// this should be the room that was just created in the `Client::sync` loop. // this should be the room that was just created in the `Client::sync` loop.
if let Some(room) = self.get_room(room_id) { if let Some(room) = self.get_room(room_id) {
let mut room = room.write().unwrap(); let mut room = room.lock().await;
room.receive_presence_event(event) room.receive_presence_event(event)
} else { } else {
false false
@ -366,10 +365,10 @@ impl Client {
/// # Arguments /// # Arguments
/// ///
/// * `event` - The presence event for a specified room member. /// * `event` - The presence event for a specified room member.
pub fn receive_account_data(&mut self, room_id: &str, event: &NonRoomEvent) -> bool { pub async fn receive_account_data(&mut self, room_id: &str, event: &NonRoomEvent) -> bool {
match event { match event {
NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu), NonRoomEvent::IgnoredUserList(iu) => self.handle_ignored_users(iu),
NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p), NonRoomEvent::Presence(p) => self.receive_presence_event(room_id, p).await,
NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr), NonRoomEvent::PushRules(pr) => self.handle_push_rules(pr),
_ => false, _ => false,
} }

View File

@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use crate::events::collections::all::{RoomEvent, StateEvent}; use crate::events::collections::all::{RoomEvent, StateEvent};
use crate::events::collections::only::Event as NonRoomEvent; use crate::events::collections::only::Event as NonRoomEvent;
@ -26,51 +26,51 @@ use tokio::sync::Mutex;
pub trait EventEmitter: Send + Sync { pub trait EventEmitter: Send + Sync {
// ROOM EVENTS from `IncomingTimeline` // ROOM EVENTS from `IncomingTimeline`
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event.
async fn on_room_member(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_member(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event.
async fn on_room_name(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_name(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event.
async fn on_room_canonical_alias(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_canonical_alias(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event.
async fn on_room_aliases(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_aliases(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event.
async fn on_room_avatar(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_avatar(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event.
async fn on_room_message(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_message(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event.
async fn on_room_message_feedback(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_message_feedback(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event.
async fn on_room_redaction(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_redaction(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event. /// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event.
async fn on_room_power_levels(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {} async fn on_room_power_levels(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<RoomEvent>>) {}
// `RoomEvent`s from `IncomingState` // `RoomEvent`s from `IncomingState`
/// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event.
async fn on_state_member(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {} async fn on_state_member(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomName` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomName` event.
async fn on_state_name(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {} async fn on_state_name(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event.
async fn on_state_canonical_alias(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {} async fn on_state_canonical_alias(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event.
async fn on_state_aliases(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {} async fn on_state_aliases(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event.
async fn on_state_avatar(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {} async fn on_state_avatar(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event.
async fn on_state_power_levels(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {} async fn on_state_power_levels(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event. /// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event.
async fn on_state_join_rules(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {} async fn on_state_join_rules(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<StateEvent>>) {}
// `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData` // `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData`
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event.
async fn on_account_presence(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {} async fn on_account_presence(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event.
async fn on_account_ignored_users(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {} async fn on_account_ignored_users(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event.
async fn on_account_push_rules(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {} async fn on_account_push_rules(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event.
async fn on_account_data_fully_read(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {} async fn on_account_data_fully_read(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
// `PresenceEvent` is a struct so there is only the one method // `PresenceEvent` is a struct so there is only the one method
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event.
async fn on_presence_event(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<PresenceEvent>>) {} async fn on_presence_event(&mut self, _: Arc<Mutex<Room>>, _: Arc<Mutex<PresenceEvent>>) {}
} }

View File

@ -351,8 +351,8 @@ mod test {
let room = &rooms let room = &rooms
.get("!SVkFJHzfwvuaIEawgC:localhost") .get("!SVkFJHzfwvuaIEawgC:localhost")
.unwrap() .unwrap()
.read() .lock()
.unwrap(); .await;
assert_eq!(2, room.members.len()); assert_eq!(2, room.members.len());
for (_id, member) in &room.members { for (_id, member) in &room.members {

View File

@ -167,8 +167,8 @@ mod test {
let mut room = rooms let mut room = rooms
.get_mut("!SVkFJHzfwvuaIEawgC:localhost") .get_mut("!SVkFJHzfwvuaIEawgC:localhost")
.unwrap() .unwrap()
.write() .lock()
.unwrap(); .await;
for (_id, member) in &mut room.members { for (_id, member) in &mut room.members {
let power = power_levels(); let power = power_levels();