From 5f2269f12fa82cbfca8b1d157aba996c9eb868dd Mon Sep 17 00:00:00 2001 From: Devin R Date: Tue, 14 Apr 2020 08:39:51 -0400 Subject: [PATCH 01/11] party bot example responds to !party --- examples/command_bot.rs | 129 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 examples/command_bot.rs diff --git a/examples/command_bot.rs b/examples/command_bot.rs new file mode 100644 index 00000000..d64c911f --- /dev/null +++ b/examples/command_bot.rs @@ -0,0 +1,129 @@ +use std::ops::Deref; +use std::sync::Arc; +use std::{env, process::exit}; +use url::Url; + +use matrix_sdk::{ + self, + events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, + identifiers::RoomId, + AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, +}; +use tokio::runtime::Handle; +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, +}; + +struct CommandBot { + send: Sender<(RoomId, String)>, +} + +impl CommandBot { + pub fn new(send: Sender<(RoomId, String)>) -> Self { + Self { send } + } +} + +#[async_trait::async_trait] +impl EventEmitter for CommandBot { + async fn on_room_message(&mut self, room: Arc>, event: Arc>) { + if let MessageEvent { + content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), + sender, + .. + } = event.lock().await.deref() + { + let room = room.lock().await; + if msg_body.contains("!party") { + self.send + .send((room.room_id.clone(), "let's PARTY!! 🥳🎊🎉".to_string())) + .await + .unwrap() + } + } + } +} + +#[allow(clippy::for_loop_over_option)] +async fn login_and_sync( + homeserver_url: String, + username: String, + password: String, + exec: Handle, +) -> Result<(), matrix_sdk::Error> { + let client_config = AsyncClientConfig::new(); + // .proxy("http://localhost:8080")? + // .disable_ssl_verification(); + let homeserver_url = Url::parse(&homeserver_url)?; + let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); + + let (send, mut recv) = mpsc::channel(100); + + client + .add_event_emitter(Arc::new(Mutex::new(Box::new(CommandBot::new(send))))) + .await; + + client + .login( + username.clone(), + password, + None, + Some("command bot".to_string()), + ) + .await?; + + println!("logged in as user {}", username); + + let client = Arc::new(Mutex::new(client)); + let send_client = Arc::clone(&client); + + exec.spawn(async move { + for (id, msg) in recv.recv().await { + let content = MessageEventContent::Text(TextMessageEventContent { + body: msg, + format: None, + formatted_body: None, + relates_to: None, + }); + send_client + .lock() + .await + .room_send(&id, content) + .await + .unwrap(); + } + }); + + client + .lock() + .await + .sync_forever(SyncSettings::new(), |_| async {}) + .await; + + Ok(()) +} + +fn main() -> Result<(), matrix_sdk::Error> { + let (homeserver_url, username, password) = + match (env::args().nth(1), env::args().nth(2), env::args().nth(3)) { + (Some(a), Some(b), Some(c)) => (a, b, c), + _ => { + eprintln!( + "Usage: {} ", + env::args().next().unwrap() + ); + exit(1) + } + }; + + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + + let executor = runtime.handle().clone(); + runtime.block_on(async { login_and_sync(homeserver_url, username, password, executor).await }) +} From 6f2b5194d13ebf29a691f1e798388d4b445a0898 Mon Sep 17 00:00:00 2001 From: Devin R Date: Tue, 14 Apr 2020 08:45:40 -0400 Subject: [PATCH 02/11] remove unused imports/vars --- examples/command_bot.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index d64c911f..333ab28e 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -11,7 +11,7 @@ use matrix_sdk::{ }; use tokio::runtime::Handle; use tokio::sync::{ - mpsc::{self, Receiver, Sender}, + mpsc::{self, Sender}, Mutex, }; @@ -30,14 +30,16 @@ impl EventEmitter for CommandBot { async fn on_room_message(&mut self, room: Arc>, event: Arc>) { if let MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), - sender, .. } = event.lock().await.deref() { let room = room.lock().await; if msg_body.contains("!party") { self.send - .send((room.room_id.clone(), "let's PARTY!! 🥳🎊🎉".to_string())) + .send(( + room.room_id.clone(), + "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(), + )) .await .unwrap() } @@ -89,7 +91,7 @@ async fn login_and_sync( send_client .lock() .await - .room_send(&id, content) + .room_send(&id, content, None) .await .unwrap(); } From 87c9dbdad73b493866c9e036df737e89f3b0636f Mon Sep 17 00:00:00 2001 From: Devin R Date: Tue, 14 Apr 2020 14:49:29 -0400 Subject: [PATCH 03/11] clone client --- examples/command_bot.rs | 80 +++++++++++++---------------------------- 1 file changed, 24 insertions(+), 56 deletions(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index 333ab28e..fdfada22 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -6,22 +6,17 @@ use url::Url; use matrix_sdk::{ self, events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, - identifiers::RoomId, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, }; -use tokio::runtime::Handle; -use tokio::sync::{ - mpsc::{self, Sender}, - Mutex, -}; +use tokio::sync::Mutex; struct CommandBot { - send: Sender<(RoomId, String)>, + client: AsyncClient, } impl CommandBot { - pub fn new(send: Sender<(RoomId, String)>) -> Self { - Self { send } + pub fn new(client: AsyncClient) -> Self { + Self { client } } } @@ -35,13 +30,18 @@ impl EventEmitter for CommandBot { { let room = room.lock().await; if msg_body.contains("!party") { - self.send - .send(( - room.room_id.clone(), - "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(), - )) + println!("!party found"); + let content = MessageEventContent::Text(TextMessageEventContent { + body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(), + format: None, + formatted_body: None, + relates_to: None, + }); + self.client + .room_send(&room.room_id, content, None) .await - .unwrap() + .unwrap(); + println!("message sent"); } } } @@ -52,7 +52,6 @@ async fn login_and_sync( homeserver_url: String, username: String, password: String, - exec: Handle, ) -> Result<(), matrix_sdk::Error> { let client_config = AsyncClientConfig::new(); // .proxy("http://localhost:8080")? @@ -60,10 +59,10 @@ async fn login_and_sync( let homeserver_url = Url::parse(&homeserver_url)?; let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); - let (send, mut recv) = mpsc::channel(100); - client - .add_event_emitter(Arc::new(Mutex::new(Box::new(CommandBot::new(send))))) + .add_event_emitter(Arc::new(Mutex::new(Box::new(CommandBot::new( + client.clone(), + ))))) .await; client @@ -75,38 +74,15 @@ async fn login_and_sync( ) .await?; - println!("logged in as user {}", username); + println!("logged in as {}", username); - let client = Arc::new(Mutex::new(client)); - let send_client = Arc::clone(&client); - - exec.spawn(async move { - for (id, msg) in recv.recv().await { - let content = MessageEventContent::Text(TextMessageEventContent { - body: msg, - format: None, - formatted_body: None, - relates_to: None, - }); - send_client - .lock() - .await - .room_send(&id, content, None) - .await - .unwrap(); - } - }); - - client - .lock() - .await - .sync_forever(SyncSettings::new(), |_| async {}) - .await; + client.sync(SyncSettings::new()).await.unwrap(); Ok(()) } -fn main() -> Result<(), matrix_sdk::Error> { +#[tokio::main] +async fn main() -> Result<(), matrix_sdk::Error> { let (homeserver_url, username, password) = match (env::args().nth(1), env::args().nth(2), env::args().nth(3)) { (Some(a), Some(b), Some(c)) => (a, b, c), @@ -118,14 +94,6 @@ fn main() -> Result<(), matrix_sdk::Error> { exit(1) } }; - - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .threaded_scheduler() - .enable_all() - .build() - .unwrap(); - - let executor = runtime.handle().clone(); - runtime.block_on(async { login_and_sync(homeserver_url, username, password, executor).await }) + login_and_sync(homeserver_url, username, password).await?; + Ok(()) } From a5ab7d97dac03ff95f2d113e348e31e900034e33 Mon Sep 17 00:00:00 2001 From: Devin R Date: Tue, 14 Apr 2020 15:16:20 -0400 Subject: [PATCH 04/11] use AsyncClient::sync_forever --- examples/command_bot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index fdfada22..aaaa6b81 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -76,7 +76,7 @@ async fn login_and_sync( println!("logged in as {}", username); - client.sync(SyncSettings::new()).await.unwrap(); + client.sync_forever(SyncSettings::new(), |_| async {}).await; Ok(()) } From cf029b2e4f998829f50e8178cb20fad3bfda4d93 Mon Sep 17 00:00:00 2001 From: Devin R Date: Tue, 14 Apr 2020 18:10:10 -0400 Subject: [PATCH 05/11] make EventEmitter shared refs, in AsyncClient::sync lock only in inner most scope --- examples/command_bot.rs | 80 +++++++++----- examples/login.rs | 11 +- src/async_client.rs | 22 ++-- src/base_client.rs | 218 +++++++++------------------------------ src/event_emitter/mod.rs | 171 +++++++++--------------------- src/models/room.rs | 2 +- 6 files changed, 165 insertions(+), 339 deletions(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index aaaa6b81..222b4e34 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -8,41 +8,55 @@ use matrix_sdk::{ events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, }; +use tokio::runtime::{Handle, Runtime}; +use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::Mutex; struct CommandBot { - client: AsyncClient, + client: Mutex, + // sender: Sender<(RoomId, MessageEventContent)> } impl CommandBot { pub fn new(client: AsyncClient) -> Self { - Self { client } + Self { + client: Mutex::new(client), + } } } #[async_trait::async_trait] impl EventEmitter for CommandBot { - async fn on_room_message(&mut self, room: Arc>, event: Arc>) { - if let MessageEvent { + async fn on_room_message(&self, room: &Room, event: &MessageEvent) { + let msg_body = if let MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), .. - } = event.lock().await.deref() + } = event { - let room = room.lock().await; - if msg_body.contains("!party") { - println!("!party found"); - let content = MessageEventContent::Text(TextMessageEventContent { - body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(), - format: None, - formatted_body: None, - relates_to: None, - }); - self.client - .room_send(&room.room_id, content, None) - .await - .unwrap(); - println!("message sent"); - } + msg_body.clone() + } else { + String::new() + }; + + if msg_body.contains("!party") { + let content = MessageEventContent::Text(TextMessageEventContent { + body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(), + format: None, + formatted_body: None, + relates_to: None, + }); + let room_id = &room.room_id; + + println!("sending"); + + self.client + .lock() + .await + .room_send(&room_id, content, None) + .await + .unwrap(); + + println!("message sent"); } } } @@ -52,6 +66,7 @@ async fn login_and_sync( homeserver_url: String, username: String, password: String, + exec: Handle, ) -> Result<(), matrix_sdk::Error> { let client_config = AsyncClientConfig::new(); // .proxy("http://localhost:8080")? @@ -60,9 +75,7 @@ async fn login_and_sync( let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); client - .add_event_emitter(Arc::new(Mutex::new(Box::new(CommandBot::new( - client.clone(), - ))))) + .add_event_emitter(Box::new(CommandBot::new(client.clone()))) .await; client @@ -76,13 +89,16 @@ async fn login_and_sync( println!("logged in as {}", username); - client.sync_forever(SyncSettings::new(), |_| async {}).await; + exec.spawn(async move { + client.sync_forever(SyncSettings::new(), |_| async {}).await; + }) + .await + .unwrap(); Ok(()) } -#[tokio::main] -async fn main() -> Result<(), matrix_sdk::Error> { +fn main() -> Result<(), matrix_sdk::Error> { let (homeserver_url, username, password) = match (env::args().nth(1), env::args().nth(2), env::args().nth(3)) { (Some(a), Some(b), Some(c)) => (a, b, c), @@ -94,6 +110,16 @@ async fn main() -> Result<(), matrix_sdk::Error> { exit(1) } }; - login_and_sync(homeserver_url, username, password).await?; + + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + + let exec = runtime.handle().clone(); + + runtime.block_on(async { login_and_sync(homeserver_url, username, password, exec).await })?; Ok(()) } diff --git a/examples/login.rs b/examples/login.rs index e69a9087..34f943a2 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -14,15 +14,14 @@ struct EventCallback; #[async_trait::async_trait] impl EventEmitter for EventCallback { - async fn on_room_message(&mut self, room: Arc>, event: Arc>) { + async fn on_room_message(&self, room: &Room, event: &MessageEvent) { if let MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), sender, .. - } = event.lock().await.deref() + } = event { - let rooms = room.lock().await; - let member = rooms.members.get(&sender).unwrap(); + let member = room.members.get(&sender).unwrap(); println!( "{}: {}", member.display_name.as_ref().unwrap_or(&sender.to_string()), @@ -43,9 +42,7 @@ async fn login( let homeserver_url = Url::parse(&homeserver_url)?; let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); - client - .add_event_emitter(Arc::new(Mutex::new(Box::new(EventCallback)))) - .await; + client.add_event_emitter(Box::new(EventCallback)).await; client .login(username, password, None, Some("rust-sdk".to_string())) diff --git a/src/async_client.rs b/src/async_client.rs index 718e423b..d52b724c 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -273,10 +273,7 @@ impl AsyncClient { /// Add `EventEmitter` to `AsyncClient`. /// /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. - pub async fn add_event_emitter( - &mut self, - emitter: Arc>>, - ) { + pub async fn add_event_emitter(&mut self, emitter: Box) { self.base_client.write().await.event_emitter = Some(emitter); } @@ -301,7 +298,7 @@ impl AsyncClient { /// Returns the rooms this client knows about. /// /// A `HashMap` of room id to `matrix::models::Room` - pub async fn get_rooms(&self) -> HashMap>> { + pub async fn get_rooms(&self) -> HashMap>> { self.base_client.read().await.joined_rooms.clone() } @@ -565,9 +562,8 @@ impl AsyncClient { let mut response = self.send(request).await?; for (room_id, room) in &mut response.rooms.join { - let mut client = self.base_client.write().await; - let _matrix_room = { + let mut client = self.base_client.write().await; for event in &room.state.events { if let EventResult::Ok(e) = event { client.receive_joined_state_event(&room_id, &e).await; @@ -580,12 +576,14 @@ impl AsyncClient { // re looping is not ideal here for event in &mut room.state.events { if let EventResult::Ok(e) = event { + let client = self.base_client.read().await; client.emit_state_event(room_id, e).await; } } for mut event in &mut room.timeline.events { let decrypted_event = { + let mut client = self.base_client.write().await; client .receive_joined_timeline_event(room_id, &mut event) .await @@ -596,6 +594,7 @@ impl AsyncClient { } if let EventResult::Ok(e) = event { + let client = self.base_client.read().await; client.emit_timeline_event(room_id, e).await; } } @@ -604,8 +603,8 @@ impl AsyncClient { for account_data in &mut room.account_data.events { { if let EventResult::Ok(e) = account_data { + let mut client = self.base_client.write().await; client.receive_account_data_event(&room_id, e).await; - client.emit_account_data_event(room_id, e).await; } } @@ -617,6 +616,7 @@ impl AsyncClient { for presence in &mut response.presence.events { { if let EventResult::Ok(e) = presence { + let mut client = self.base_client.write().await; client.receive_presence_event(&room_id, e).await; client.emit_presence_event(room_id, e).await; @@ -627,6 +627,7 @@ impl AsyncClient { for ephemeral in &mut room.ephemeral.events { { if let EventResult::Ok(e) = ephemeral { + let mut client = self.base_client.write().await; client.receive_ephemeral_event(&room_id, e).await; client.emit_ephemeral_event(room_id, e).await; @@ -810,7 +811,6 @@ impl AsyncClient { } else { request_builder }; - let mut response = request_builder.send().await?; trace!("Got response: {:?}", response); @@ -892,7 +892,7 @@ impl AsyncClient { let room = client.joined_rooms.get(room_id); match room { - Some(r) => r.lock().await.is_encrypted(), + Some(r) => r.write().await.is_encrypted(), None => false, } }; @@ -901,7 +901,7 @@ impl AsyncClient { let missing_sessions = { let client = self.base_client.read().await; let room = client.joined_rooms.get(room_id); - let room = room.as_ref().unwrap().lock().await; + let room = room.as_ref().unwrap().write().await; let users = room.members.keys(); self.base_client .read() diff --git a/src/base_client.rs b/src/base_client.rs index 12164686..5c8a5bf3 100644 --- a/src/base_client.rs +++ b/src/base_client.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; #[cfg(feature = "encryption")] use std::collections::HashSet; use std::fmt; +use std::ops::Deref; use std::sync::Arc; #[cfg(feature = "encryption")] @@ -36,7 +37,9 @@ use crate::models::Room; use crate::session::Session; use crate::EventEmitter; +#[cfg(feature = "encryption")] use tokio::sync::Mutex; +use tokio::sync::RwLock; #[cfg(feature = "encryption")] use crate::crypto::{OlmMachine, OneTimeKeys}; @@ -65,14 +68,14 @@ pub struct Client { /// The current sync token that should be used for the next sync call. pub sync_token: Option, /// A map of the rooms our user is joined in. - pub joined_rooms: HashMap>>, + pub joined_rooms: HashMap>>, /// A list of ignored users. pub ignored_users: Vec, /// The push ruleset for the logged in user. pub push_ruleset: Option, /// Any implementor of EventEmitter will act as the callbacks for various /// events. - pub event_emitter: Option>>>, + pub event_emitter: Option>, #[cfg(feature = "encryption")] olm: Arc>>, @@ -125,10 +128,7 @@ impl Client { /// Add `EventEmitter` to `Client`. /// /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. - pub async fn add_event_emitter( - &mut self, - emitter: Arc>>, - ) { + pub async fn add_event_emitter(&mut self, emitter: Box) { self.event_emitter = Some(emitter); } @@ -160,7 +160,7 @@ impl Client { pub(crate) async fn calculate_room_name(&self, room_id: &RoomId) -> Option { if let Some(room) = self.joined_rooms.get(room_id) { - let room = room.lock().await; + let room = room.read().await; Some(room.room_name.calculate_name(room_id, &room.members)) } else { None @@ -170,17 +170,17 @@ impl Client { pub(crate) async fn calculate_room_names(&self) -> Vec { let mut res = Vec::new(); for (id, room) in &self.joined_rooms { - let room = room.lock().await; + let room = room.read().await; res.push(room.room_name.calculate_name(id, &room.members)) } res } - pub(crate) fn get_or_create_room(&mut self, room_id: &RoomId) -> &mut Arc> { + pub(crate) fn get_or_create_room(&mut self, room_id: &RoomId) -> &mut Arc> { #[allow(clippy::or_fun_call)] self.joined_rooms .entry(room_id.clone()) - .or_insert(Arc::new(Mutex::new(Room::new( + .or_insert(Arc::new(RwLock::new(Room::new( room_id, &self .session @@ -190,7 +190,7 @@ impl Client { )))) } - pub(crate) fn get_room(&self, room_id: &RoomId) -> Option<&Arc>> { + pub(crate) fn get_room(&self, room_id: &RoomId) -> Option<&Arc>> { self.joined_rooms.get(room_id) } @@ -259,7 +259,7 @@ impl Client { } } - let mut room = self.get_or_create_room(&room_id).lock().await; + let mut room = self.get_or_create_room(&room_id).write().await; room.receive_timeline_event(e); decrypted_event } @@ -282,7 +282,7 @@ impl Client { room_id: &RoomId, event: &StateEvent, ) -> bool { - let mut room = self.get_or_create_room(room_id).lock().await; + let mut room = self.get_or_create_room(room_id).write().await; room.receive_state_event(event) } @@ -303,7 +303,7 @@ impl Client { ) -> bool { // this should be the room that was just created in the `Client::sync` loop. if let Some(room) = self.get_room(room_id) { - let mut room = room.lock().await; + let mut room = room.write().await; room.receive_presence_event(event) } else { false @@ -376,7 +376,7 @@ impl Client { // part where we already iterate through the rooms to avoid yet // another room loop. for room in self.joined_rooms.values() { - let room = room.lock().await; + let room = room.write().await; if !room.is_encrypted() { continue; } @@ -459,7 +459,7 @@ impl Client { match &mut *olm { Some(o) => { - let room = room.lock().await; + let room = room.write().await; let members = room.members.keys(); Ok(o.share_group_session(room_id, members).await?) } @@ -573,37 +573,26 @@ impl Client { Ok(()) } - pub(crate) async fn emit_timeline_event(&mut self, room_id: &RoomId, event: &mut RoomEvent) { + pub(crate) async fn emit_timeline_event(&self, room_id: &RoomId, event: &RoomEvent) { match event { RoomEvent::RoomMember(mem) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_member(Arc::clone(&room), Arc::new(Mutex::new(mem.clone()))) - .await; + ee.on_room_member(room.read().await.deref(), &mem).await; } } } RoomEvent::RoomName(name) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_name(Arc::clone(&room), Arc::new(Mutex::new(name.clone()))) - .await; + ee.on_room_name(room.read().await.deref(), &name).await; } } } RoomEvent::RoomCanonicalAlias(canonical) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_canonical_alias( - Arc::clone(&room), - Arc::new(Mutex::new(canonical.clone())), - ) + ee.on_room_canonical_alias(room.read().await.deref(), &canonical) .await; } } @@ -611,12 +600,7 @@ impl Client { RoomEvent::RoomAliases(aliases) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_aliases( - Arc::clone(&room), - Arc::new(Mutex::new(aliases.clone())), - ) + ee.on_room_aliases(room.read().await.deref(), &aliases) .await; } } @@ -624,32 +608,21 @@ impl Client { RoomEvent::RoomAvatar(avatar) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_avatar(Arc::clone(&room), Arc::new(Mutex::new(avatar.clone()))) - .await; + ee.on_room_avatar(room.read().await.deref(), &avatar).await; } } } RoomEvent::RoomMessage(msg) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_message(Arc::clone(&room), Arc::new(Mutex::new(msg.clone()))) - .await; + ee.on_room_message(room.read().await.deref(), &msg).await; } } } RoomEvent::RoomMessageFeedback(msg_feedback) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_message_feedback( - Arc::clone(&room), - Arc::new(Mutex::new(msg_feedback.clone())), - ) + ee.on_room_message_feedback(room.read().await.deref(), &msg_feedback) .await; } } @@ -657,12 +630,7 @@ impl Client { RoomEvent::RoomRedaction(redaction) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_redaction( - Arc::clone(&room), - Arc::new(Mutex::new(redaction.clone())), - ) + ee.on_room_redaction(room.read().await.deref(), &redaction) .await; } } @@ -670,12 +638,7 @@ impl Client { RoomEvent::RoomPowerLevels(power) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_room_power_levels( - Arc::clone(&room), - Arc::new(Mutex::new(power.clone())), - ) + ee.on_room_power_levels(room.read().await.deref(), &power) .await; } } @@ -684,40 +647,26 @@ impl Client { } } - pub(crate) async fn emit_state_event(&mut self, room_id: &RoomId, event: &mut StateEvent) { + pub(crate) async fn emit_state_event(&self, room_id: &RoomId, event: &StateEvent) { match event { StateEvent::RoomMember(member) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_state_member( - Arc::clone(&room), - Arc::new(Mutex::new(member.clone())), - ) - .await; + ee.on_state_member(room.read().await.deref(), &member).await; } } } StateEvent::RoomName(name) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_state_name(Arc::clone(&room), Arc::new(Mutex::new(name.clone()))) - .await; + ee.on_state_name(room.read().await.deref(), &name).await; } } } StateEvent::RoomCanonicalAlias(canonical) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_state_canonical_alias( - Arc::clone(&room), - Arc::new(Mutex::new(canonical.clone())), - ) + ee.on_state_canonical_alias(room.read().await.deref(), &canonical) .await; } } @@ -725,12 +674,7 @@ impl Client { StateEvent::RoomAliases(aliases) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_state_aliases( - Arc::clone(&room), - Arc::new(Mutex::new(aliases.clone())), - ) + ee.on_state_aliases(room.read().await.deref(), &aliases) .await; } } @@ -738,25 +682,14 @@ impl Client { StateEvent::RoomAvatar(avatar) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_state_avatar( - Arc::clone(&room), - Arc::new(Mutex::new(avatar.clone())), - ) - .await; + ee.on_state_avatar(room.read().await.deref(), &avatar).await; } } } StateEvent::RoomPowerLevels(power) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_state_power_levels( - Arc::clone(&room), - Arc::new(Mutex::new(power.clone())), - ) + ee.on_state_power_levels(room.read().await.deref(), &power) .await; } } @@ -764,12 +697,7 @@ impl Client { StateEvent::RoomJoinRules(rules) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_state_join_rules( - Arc::clone(&room), - Arc::new(Mutex::new(rules.clone())), - ) + ee.on_state_join_rules(room.read().await.deref(), &rules) .await; } } @@ -778,21 +706,12 @@ impl Client { } } - pub(crate) async fn emit_account_data_event( - &mut self, - room_id: &RoomId, - event: &mut NonRoomEvent, - ) { + pub(crate) async fn emit_account_data_event(&self, room_id: &RoomId, event: &NonRoomEvent) { match event { NonRoomEvent::Presence(presence) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_presence( - Arc::clone(&room), - Arc::new(Mutex::new(presence.clone())), - ) + ee.on_account_presence(room.read().await.deref(), &presence) .await; } } @@ -800,12 +719,7 @@ impl Client { NonRoomEvent::IgnoredUserList(ignored) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_ignored_users( - Arc::clone(&room), - Arc::new(Mutex::new(ignored.clone())), - ) + ee.on_account_ignored_users(room.read().await.deref(), &ignored) .await; } } @@ -813,12 +727,7 @@ impl Client { NonRoomEvent::PushRules(rules) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_push_rules( - Arc::clone(&room), - Arc::new(Mutex::new(rules.clone())), - ) + ee.on_account_push_rules(room.read().await.deref(), &rules) .await; } } @@ -826,12 +735,7 @@ impl Client { NonRoomEvent::FullyRead(full_read) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_data_fully_read( - Arc::clone(&room), - Arc::new(Mutex::new(full_read.clone())), - ) + ee.on_account_data_fully_read(room.read().await.deref(), &full_read) .await; } } @@ -840,21 +744,12 @@ impl Client { } } - pub(crate) async fn emit_ephemeral_event( - &mut self, - room_id: &RoomId, - event: &mut NonRoomEvent, - ) { + pub(crate) async fn emit_ephemeral_event(&self, room_id: &RoomId, event: &NonRoomEvent) { match event { NonRoomEvent::Presence(presence) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_presence( - Arc::clone(&room), - Arc::new(Mutex::new(presence.clone())), - ) + ee.on_account_presence(room.read().await.deref(), &presence) .await; } } @@ -862,12 +757,7 @@ impl Client { NonRoomEvent::IgnoredUserList(ignored) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_ignored_users( - Arc::clone(&room), - Arc::new(Mutex::new(ignored.clone())), - ) + ee.on_account_ignored_users(room.read().await.deref(), &ignored) .await; } } @@ -875,12 +765,7 @@ impl Client { NonRoomEvent::PushRules(rules) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_push_rules( - Arc::clone(&room), - Arc::new(Mutex::new(rules.clone())), - ) + ee.on_account_push_rules(room.read().await.deref(), &rules) .await; } } @@ -888,12 +773,7 @@ impl Client { NonRoomEvent::FullyRead(full_read) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_account_data_fully_read( - Arc::clone(&room), - Arc::new(Mutex::new(full_read.clone())), - ) + ee.on_account_data_fully_read(room.read().await.deref(), &full_read) .await; } } @@ -902,16 +782,10 @@ impl Client { } } - pub(crate) async fn emit_presence_event( - &mut self, - room_id: &RoomId, - event: &mut PresenceEvent, - ) { + pub(crate) async fn emit_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.lock() - .await - .on_presence_event(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + ee.on_presence_event(room.read().await.deref(), &event) .await; } } diff --git a/src/event_emitter/mod.rs b/src/event_emitter/mod.rs index 0129cadf..6ee62d18 100644 --- a/src/event_emitter/mod.rs +++ b/src/event_emitter/mod.rs @@ -13,8 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use crate::events::{ fully_read::FullyReadEvent, ignored_user_list::IgnoredUserListEvent, @@ -34,7 +32,6 @@ use crate::events::{ }; use crate::models::Room; -use tokio::sync::Mutex; /// This trait allows any type implementing `EventEmitter` to specify event callbacks for each event. /// The `AsyncClient` calls each method when the corresponding event is received. /// @@ -57,15 +54,14 @@ use tokio::sync::Mutex; /// /// #[async_trait::async_trait] /// impl EventEmitter for EventCallback { -/// async fn on_room_message(&mut self, room: Arc>, event: Arc>) { +/// async fn on_room_message(&self, room: &Room, event: &MessageEvent) { /// if let MessageEvent { /// content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), /// sender, /// .. -/// } = event.lock().await.deref() +/// } = event /// { -/// let rooms = room.lock().await; -/// let member = rooms.members.get(&sender).unwrap(); +/// let member = room.members.get(&sender).unwrap(); /// println!( /// "{}: {}", /// member @@ -82,193 +78,128 @@ use tokio::sync::Mutex; pub trait EventEmitter: Send + Sync { // ROOM EVENTS from `IncomingTimeline` /// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event. - async fn on_room_member(&mut self, _: Arc>, _: Arc>) {} + async fn on_room_member(&self, _: &Room, _: &MemberEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event. - async fn on_room_name(&mut self, _: Arc>, _: Arc>) {} + async fn on_room_name(&self, _: &Room, _: &NameEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event. - async fn on_room_canonical_alias( - &mut self, - _: Arc>, - _: Arc>, - ) { - } + async fn on_room_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event. - async fn on_room_aliases(&mut self, _: Arc>, _: Arc>) {} + async fn on_room_aliases(&self, _: &Room, _: &AliasesEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event. - async fn on_room_avatar(&mut self, _: Arc>, _: Arc>) {} + async fn on_room_avatar(&self, _: &Room, _: &AvatarEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event. - async fn on_room_message(&mut self, _: Arc>, _: Arc>) {} + async fn on_room_message(&self, _: &Room, _: &MessageEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event. - async fn on_room_message_feedback( - &mut self, - _: Arc>, - _: Arc>, - ) { - } + async fn on_room_message_feedback(&self, _: &Room, _: &FeedbackEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event. - async fn on_room_redaction(&mut self, _: Arc>, _: Arc>) {} + async fn on_room_redaction(&self, _: &Room, _: &RedactionEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event. - async fn on_room_power_levels(&mut self, _: Arc>, _: Arc>) { - } + async fn on_room_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {} // `RoomEvent`s from `IncomingState` /// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event. - async fn on_state_member(&mut self, _: Arc>, _: Arc>) {} + async fn on_state_member(&self, _: &Room, _: &MemberEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomName` event. - async fn on_state_name(&mut self, _: Arc>, _: Arc>) {} + async fn on_state_name(&self, _: &Room, _: &NameEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event. - async fn on_state_canonical_alias( - &mut self, - _: Arc>, - _: Arc>, - ) { - } + async fn on_state_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event. - async fn on_state_aliases(&mut self, _: Arc>, _: Arc>) {} + async fn on_state_aliases(&self, _: &Room, _: &AliasesEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event. - async fn on_state_avatar(&mut self, _: Arc>, _: Arc>) {} + async fn on_state_avatar(&self, _: &Room, _: &AvatarEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event. - async fn on_state_power_levels( - &mut self, - _: Arc>, - _: Arc>, - ) { - } + async fn on_state_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event. - async fn on_state_join_rules(&mut self, _: Arc>, _: Arc>) {} + async fn on_state_join_rules(&self, _: &Room, _: &JoinRulesEvent) {} // `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData` /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event. - async fn on_account_presence(&mut self, _: Arc>, _: Arc>) {} + async fn on_account_presence(&self, _: &Room, _: &PresenceEvent) {} /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event. - async fn on_account_ignored_users( - &mut self, - _: Arc>, - _: Arc>, - ) { - } + async fn on_account_ignored_users(&self, _: &Room, _: &IgnoredUserListEvent) {} /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event. - async fn on_account_push_rules(&mut self, _: Arc>, _: Arc>) {} + async fn on_account_push_rules(&self, _: &Room, _: &PushRulesEvent) {} /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. - async fn on_account_data_fully_read( - &mut self, - _: Arc>, - _: Arc>, - ) { - } + async fn on_account_data_fully_read(&self, _: &Room, _: &FullyReadEvent) {} // `PresenceEvent` is a struct so there is only the one method /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. - async fn on_presence_event(&mut self, _: Arc>, _: Arc>) {} + async fn on_presence_event(&self, _: &Room, _: &PresenceEvent) {} } #[cfg(test)] mod test { use super::*; - + use std::sync::Arc; + use tokio::sync::Mutex; + #[derive(Clone)] pub struct EvEmitterTest(Arc>>); #[async_trait::async_trait] impl EventEmitter for EvEmitterTest { - async fn on_room_member(&mut self, _: Arc>, _: Arc>) { + async fn on_room_member(&self, _: &Room, _: &MemberEvent) { self.0.lock().await.push("member".to_string()) } - async fn on_room_name(&mut self, _: Arc>, _: Arc>) { + async fn on_room_name(&self, _: &Room, _: &NameEvent) { self.0.lock().await.push("name".to_string()) } - async fn on_room_canonical_alias( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_room_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) { self.0.lock().await.push("canonical".to_string()) } - async fn on_room_aliases(&mut self, _: Arc>, _: Arc>) { + async fn on_room_aliases(&self, _: &Room, _: &AliasesEvent) { self.0.lock().await.push("aliases".to_string()) } - async fn on_room_avatar(&mut self, _: Arc>, _: Arc>) { + async fn on_room_avatar(&self, _: &Room, _: &AvatarEvent) { self.0.lock().await.push("avatar".to_string()) } - async fn on_room_message(&mut self, _: Arc>, _: Arc>) { + async fn on_room_message(&self, _: &Room, _: &MessageEvent) { self.0.lock().await.push("message".to_string()) } - async fn on_room_message_feedback( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_room_message_feedback(&self, _: &Room, _: &FeedbackEvent) { self.0.lock().await.push("feedback".to_string()) } - async fn on_room_redaction(&mut self, _: Arc>, _: Arc>) { + async fn on_room_redaction(&self, _: &Room, _: &RedactionEvent) { self.0.lock().await.push("redaction".to_string()) } - async fn on_room_power_levels( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_room_power_levels(&self, _: &Room, _: &PowerLevelsEvent) { self.0.lock().await.push("power".to_string()) } - async fn on_state_member(&mut self, _: Arc>, _: Arc>) { + async fn on_state_member(&self, _: &Room, _: &MemberEvent) { self.0.lock().await.push("state member".to_string()) } - async fn on_state_name(&mut self, _: Arc>, _: Arc>) { + async fn on_state_name(&self, _: &Room, _: &NameEvent) { self.0.lock().await.push("state name".to_string()) } - async fn on_state_canonical_alias( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_state_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) { self.0.lock().await.push("state canonical".to_string()) } - async fn on_state_aliases(&mut self, _: Arc>, _: Arc>) { + async fn on_state_aliases(&self, _: &Room, _: &AliasesEvent) { self.0.lock().await.push("state aliases".to_string()) } - async fn on_state_avatar(&mut self, _: Arc>, _: Arc>) { + async fn on_state_avatar(&self, _: &Room, _: &AvatarEvent) { self.0.lock().await.push("state avatar".to_string()) } - async fn on_state_power_levels( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_state_power_levels(&self, _: &Room, _: &PowerLevelsEvent) { self.0.lock().await.push("state power".to_string()) } - async fn on_state_join_rules( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_state_join_rules(&self, _: &Room, _: &JoinRulesEvent) { self.0.lock().await.push("state rules".to_string()) } - async fn on_account_presence(&mut self, _: Arc>, _: Arc>) { + async fn on_account_presence(&self, _: &Room, _: &PresenceEvent) { self.0.lock().await.push("account presence".to_string()) } - async fn on_account_ignored_users( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_account_ignored_users(&self, _: &Room, _: &IgnoredUserListEvent) { self.0.lock().await.push("account ignore".to_string()) } - async fn on_account_push_rules( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_account_push_rules(&self, _: &Room, _: &PushRulesEvent) { self.0.lock().await.push("".to_string()) } - async fn on_account_data_fully_read( - &mut self, - _: Arc>, - _: Arc>, - ) { + async fn on_account_data_fully_read(&self, _: &Room, _: &FullyReadEvent) { self.0.lock().await.push("account read".to_string()) } - async fn on_presence_event(&mut self, _: Arc>, _: Arc>) { + async fn on_presence_event(&self, _: &Room, _: &PresenceEvent) { self.0.lock().await.push("presence event".to_string()) } } @@ -303,11 +234,9 @@ mod test { let vec = Arc::new(Mutex::new(Vec::new())); let test_vec = Arc::clone(&vec); - let emitter = Arc::new(Mutex::new( - Box::new(EvEmitterTest(vec)) as Box<(dyn EventEmitter)> - )); + let emitter = Box::new(EvEmitterTest(vec)) as Box<(dyn EventEmitter)>; let mut client = AsyncClient::new(homeserver, Some(session)).unwrap(); - client.add_event_emitter(Arc::clone(&emitter)).await; + client.add_event_emitter(emitter).await; let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let _response = client.sync(sync_settings).await.unwrap(); diff --git a/src/models/room.rs b/src/models/room.rs index f307ec93..b0f1b735 100644 --- a/src/models/room.rs +++ b/src/models/room.rs @@ -427,7 +427,7 @@ mod test { let room = &rooms .get(&RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap()) .unwrap() - .lock() + .read() .await; assert_eq!(2, room.members.len()); From 6d1cbcd9ca51cfcd5cad3cb5f8a1f6ea2c5c824d Mon Sep 17 00:00:00 2001 From: Devin R Date: Tue, 14 Apr 2020 18:14:14 -0400 Subject: [PATCH 06/11] remove runtime and clean up example --- examples/command_bot.rs | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index 222b4e34..c3004280 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -1,5 +1,3 @@ -use std::ops::Deref; -use std::sync::Arc; use std::{env, process::exit}; use url::Url; @@ -8,13 +6,10 @@ use matrix_sdk::{ events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, }; -use tokio::runtime::{Handle, Runtime}; -use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::Mutex; struct CommandBot { client: Mutex, - // sender: Sender<(RoomId, MessageEventContent)> } impl CommandBot { @@ -66,7 +61,6 @@ async fn login_and_sync( homeserver_url: String, username: String, password: String, - exec: Handle, ) -> Result<(), matrix_sdk::Error> { let client_config = AsyncClientConfig::new(); // .proxy("http://localhost:8080")? @@ -89,16 +83,13 @@ async fn login_and_sync( println!("logged in as {}", username); - exec.spawn(async move { - client.sync_forever(SyncSettings::new(), |_| async {}).await; - }) - .await - .unwrap(); + client.sync_forever(SyncSettings::new(), |_| async {}).await; Ok(()) } -fn main() -> Result<(), matrix_sdk::Error> { +#[tokio::main] +async fn main() -> Result<(), matrix_sdk::Error> { let (homeserver_url, username, password) = match (env::args().nth(1), env::args().nth(2), env::args().nth(3)) { (Some(a), Some(b), Some(c)) => (a, b, c), @@ -111,15 +102,6 @@ fn main() -> Result<(), matrix_sdk::Error> { } }; - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .threaded_scheduler() - .enable_all() - .build() - .unwrap(); - - let exec = runtime.handle().clone(); - - runtime.block_on(async { login_and_sync(homeserver_url, username, password, exec).await })?; + login_and_sync(homeserver_url, username, password).await?; Ok(()) } From 49e962e9c4bcf2af103b6506315606daffaff723 Mon Sep 17 00:00:00 2001 From: Devin R Date: Wed, 15 Apr 2020 06:58:25 -0400 Subject: [PATCH 07/11] event_emitter: use Arc, @@ -22,7 +24,7 @@ impl CommandBot { #[async_trait::async_trait] impl EventEmitter for CommandBot { - async fn on_room_message(&self, room: &Room, event: &MessageEvent) { + async fn on_room_message(&self, room: Arc>, event: &MessageEvent) { let msg_body = if let MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), .. @@ -40,7 +42,7 @@ impl EventEmitter for CommandBot { formatted_body: None, relates_to: None, }); - let room_id = &room.room_id; + let room_id = { room.read().await.room_id.clone() }; println!("sending"); @@ -62,9 +64,10 @@ async fn login_and_sync( username: String, password: String, ) -> Result<(), matrix_sdk::Error> { - let client_config = AsyncClientConfig::new(); - // .proxy("http://localhost:8080")? - // .disable_ssl_verification(); + let client_config = AsyncClientConfig::new() + .proxy("http://localhost:8080")? + .disable_ssl_verification(); + let homeserver_url = Url::parse(&homeserver_url)?; let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); diff --git a/examples/login.rs b/examples/login.rs index 34f943a2..703fd2e0 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -1,4 +1,3 @@ -use std::ops::Deref; use std::sync::Arc; use std::{env, process::exit}; use url::Url; @@ -8,25 +7,30 @@ use matrix_sdk::{ events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, }; -use tokio::sync::Mutex; +use tokio::sync::RwLock; struct EventCallback; #[async_trait::async_trait] impl EventEmitter for EventCallback { - async fn on_room_message(&self, room: &Room, event: &MessageEvent) { + async fn on_room_message(&self, room: Arc>, event: &MessageEvent) { if let MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), sender, .. } = event { - let member = room.members.get(&sender).unwrap(); - println!( - "{}: {}", - member.display_name.as_ref().unwrap_or(&sender.to_string()), - msg_body - ); + let name = { + // any reads or + let room = room.read().await; + let member = room.members.get(&sender).unwrap(); + member + .display_name + .as_ref() + .map(ToString::to_string) + .unwrap_or(sender.to_string()) + }; + println!("{}: {}", name, msg_body); } } } diff --git a/src/base_client.rs b/src/base_client.rs index 5c8a5bf3..686f8c15 100644 --- a/src/base_client.rs +++ b/src/base_client.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; #[cfg(feature = "encryption")] use std::collections::HashSet; use std::fmt; -use std::ops::Deref; use std::sync::Arc; #[cfg(feature = "encryption")] @@ -578,21 +577,21 @@ impl Client { RoomEvent::RoomMember(mem) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_member(room.read().await.deref(), &mem).await; + ee.on_room_member(Arc::clone(&room), &mem).await; } } } RoomEvent::RoomName(name) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_name(room.read().await.deref(), &name).await; + ee.on_room_name(Arc::clone(&room), &name).await; } } } RoomEvent::RoomCanonicalAlias(canonical) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_canonical_alias(room.read().await.deref(), &canonical) + ee.on_room_canonical_alias(Arc::clone(&room), &canonical) .await; } } @@ -600,29 +599,28 @@ impl Client { RoomEvent::RoomAliases(aliases) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_aliases(room.read().await.deref(), &aliases) - .await; + ee.on_room_aliases(Arc::clone(&room), &aliases).await; } } } RoomEvent::RoomAvatar(avatar) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_avatar(room.read().await.deref(), &avatar).await; + ee.on_room_avatar(Arc::clone(&room), &avatar).await; } } } RoomEvent::RoomMessage(msg) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_message(room.read().await.deref(), &msg).await; + ee.on_room_message(Arc::clone(&room), &msg).await; } } } RoomEvent::RoomMessageFeedback(msg_feedback) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_message_feedback(room.read().await.deref(), &msg_feedback) + ee.on_room_message_feedback(Arc::clone(&room), &msg_feedback) .await; } } @@ -630,16 +628,14 @@ impl Client { RoomEvent::RoomRedaction(redaction) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_redaction(room.read().await.deref(), &redaction) - .await; + ee.on_room_redaction(Arc::clone(&room), &redaction).await; } } } RoomEvent::RoomPowerLevels(power) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_room_power_levels(room.read().await.deref(), &power) - .await; + ee.on_room_power_levels(Arc::clone(&room), &power).await; } } } @@ -652,21 +648,21 @@ impl Client { StateEvent::RoomMember(member) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_state_member(room.read().await.deref(), &member).await; + ee.on_state_member(Arc::clone(&room), &member).await; } } } StateEvent::RoomName(name) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_state_name(room.read().await.deref(), &name).await; + ee.on_state_name(Arc::clone(&room), &name).await; } } } StateEvent::RoomCanonicalAlias(canonical) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_state_canonical_alias(room.read().await.deref(), &canonical) + ee.on_state_canonical_alias(Arc::clone(&room), &canonical) .await; } } @@ -674,31 +670,28 @@ impl Client { StateEvent::RoomAliases(aliases) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_state_aliases(room.read().await.deref(), &aliases) - .await; + ee.on_state_aliases(Arc::clone(&room), &aliases).await; } } } StateEvent::RoomAvatar(avatar) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_state_avatar(room.read().await.deref(), &avatar).await; + ee.on_state_avatar(Arc::clone(&room), &avatar).await; } } } StateEvent::RoomPowerLevels(power) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_state_power_levels(room.read().await.deref(), &power) - .await; + ee.on_state_power_levels(Arc::clone(&room), &power).await; } } } StateEvent::RoomJoinRules(rules) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_state_join_rules(room.read().await.deref(), &rules) - .await; + ee.on_state_join_rules(Arc::clone(&room), &rules).await; } } } @@ -711,15 +704,14 @@ impl Client { NonRoomEvent::Presence(presence) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_presence(room.read().await.deref(), &presence) - .await; + ee.on_account_presence(Arc::clone(&room), &presence).await; } } } NonRoomEvent::IgnoredUserList(ignored) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_ignored_users(room.read().await.deref(), &ignored) + ee.on_account_ignored_users(Arc::clone(&room), &ignored) .await; } } @@ -727,15 +719,14 @@ impl Client { NonRoomEvent::PushRules(rules) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_push_rules(room.read().await.deref(), &rules) - .await; + ee.on_account_push_rules(Arc::clone(&room), &rules).await; } } } NonRoomEvent::FullyRead(full_read) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_data_fully_read(room.read().await.deref(), &full_read) + ee.on_account_data_fully_read(Arc::clone(&room), &full_read) .await; } } @@ -749,15 +740,14 @@ impl Client { NonRoomEvent::Presence(presence) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_presence(room.read().await.deref(), &presence) - .await; + ee.on_account_presence(Arc::clone(&room), &presence).await; } } } NonRoomEvent::IgnoredUserList(ignored) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_ignored_users(room.read().await.deref(), &ignored) + ee.on_account_ignored_users(Arc::clone(&room), &ignored) .await; } } @@ -765,15 +755,14 @@ impl Client { NonRoomEvent::PushRules(rules) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_push_rules(room.read().await.deref(), &rules) - .await; + ee.on_account_push_rules(Arc::clone(&room), &rules).await; } } } NonRoomEvent::FullyRead(full_read) => { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_account_data_fully_read(room.read().await.deref(), &full_read) + ee.on_account_data_fully_read(Arc::clone(&room), &full_read) .await; } } @@ -785,8 +774,7 @@ impl Client { pub(crate) async fn emit_presence_event(&self, room_id: &RoomId, event: &PresenceEvent) { if let Some(ee) = &self.event_emitter { if let Some(room) = self.get_room(&room_id) { - ee.on_presence_event(room.read().await.deref(), &event) - .await; + ee.on_presence_event(Arc::clone(&room), &event).await; } } } diff --git a/src/event_emitter/mod.rs b/src/event_emitter/mod.rs index 6ee62d18..f3afd392 100644 --- a/src/event_emitter/mod.rs +++ b/src/event_emitter/mod.rs @@ -12,6 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use crate::events::{ fully_read::FullyReadEvent, @@ -31,6 +32,7 @@ use crate::events::{ }, }; use crate::models::Room; +use tokio::sync::RwLock; /// This trait allows any type implementing `EventEmitter` to specify event callbacks for each event. /// The `AsyncClient` calls each method when the corresponding event is received. @@ -78,53 +80,53 @@ use crate::models::Room; pub trait EventEmitter: Send + Sync { // ROOM EVENTS from `IncomingTimeline` /// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event. - async fn on_room_member(&self, _: &Room, _: &MemberEvent) {} + async fn on_room_member(&self, _: Arc>, _: &MemberEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event. - async fn on_room_name(&self, _: &Room, _: &NameEvent) {} + async fn on_room_name(&self, _: Arc>, _: &NameEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event. - async fn on_room_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {} + async fn on_room_canonical_alias(&self, _: Arc>, _: &CanonicalAliasEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event. - async fn on_room_aliases(&self, _: &Room, _: &AliasesEvent) {} + async fn on_room_aliases(&self, _: Arc>, _: &AliasesEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event. - async fn on_room_avatar(&self, _: &Room, _: &AvatarEvent) {} + async fn on_room_avatar(&self, _: Arc>, _: &AvatarEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event. - async fn on_room_message(&self, _: &Room, _: &MessageEvent) {} + async fn on_room_message(&self, _: Arc>, _: &MessageEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event. - async fn on_room_message_feedback(&self, _: &Room, _: &FeedbackEvent) {} + async fn on_room_message_feedback(&self, _: Arc>, _: &FeedbackEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event. - async fn on_room_redaction(&self, _: &Room, _: &RedactionEvent) {} + async fn on_room_redaction(&self, _: Arc>, _: &RedactionEvent) {} /// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event. - async fn on_room_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {} + async fn on_room_power_levels(&self, _: Arc>, _: &PowerLevelsEvent) {} // `RoomEvent`s from `IncomingState` /// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event. - async fn on_state_member(&self, _: &Room, _: &MemberEvent) {} + async fn on_state_member(&self, _: Arc>, _: &MemberEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomName` event. - async fn on_state_name(&self, _: &Room, _: &NameEvent) {} + async fn on_state_name(&self, _: Arc>, _: &NameEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event. - async fn on_state_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) {} + async fn on_state_canonical_alias(&self, _: Arc>, _: &CanonicalAliasEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event. - async fn on_state_aliases(&self, _: &Room, _: &AliasesEvent) {} + async fn on_state_aliases(&self, _: Arc>, _: &AliasesEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event. - async fn on_state_avatar(&self, _: &Room, _: &AvatarEvent) {} + async fn on_state_avatar(&self, _: Arc>, _: &AvatarEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event. - async fn on_state_power_levels(&self, _: &Room, _: &PowerLevelsEvent) {} + async fn on_state_power_levels(&self, _: Arc>, _: &PowerLevelsEvent) {} /// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event. - async fn on_state_join_rules(&self, _: &Room, _: &JoinRulesEvent) {} + async fn on_state_join_rules(&self, _: Arc>, _: &JoinRulesEvent) {} // `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData` /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event. - async fn on_account_presence(&self, _: &Room, _: &PresenceEvent) {} + async fn on_account_presence(&self, _: Arc>, _: &PresenceEvent) {} /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event. - async fn on_account_ignored_users(&self, _: &Room, _: &IgnoredUserListEvent) {} + async fn on_account_ignored_users(&self, _: Arc>, _: &IgnoredUserListEvent) {} /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event. - async fn on_account_push_rules(&self, _: &Room, _: &PushRulesEvent) {} + async fn on_account_push_rules(&self, _: Arc>, _: &PushRulesEvent) {} /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. - async fn on_account_data_fully_read(&self, _: &Room, _: &FullyReadEvent) {} + async fn on_account_data_fully_read(&self, _: Arc>, _: &FullyReadEvent) {} // `PresenceEvent` is a struct so there is only the one method /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. - async fn on_presence_event(&self, _: &Room, _: &PresenceEvent) {} + async fn on_presence_event(&self, _: Arc>, _: &PresenceEvent) {} } #[cfg(test)] @@ -137,69 +139,69 @@ mod test { #[async_trait::async_trait] impl EventEmitter for EvEmitterTest { - async fn on_room_member(&self, _: &Room, _: &MemberEvent) { + async fn on_room_member(&self, _: Arc>, _: &MemberEvent) { self.0.lock().await.push("member".to_string()) } - async fn on_room_name(&self, _: &Room, _: &NameEvent) { + async fn on_room_name(&self, _: Arc>, _: &NameEvent) { self.0.lock().await.push("name".to_string()) } - async fn on_room_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) { + async fn on_room_canonical_alias(&self, _: Arc>, _: &CanonicalAliasEvent) { self.0.lock().await.push("canonical".to_string()) } - async fn on_room_aliases(&self, _: &Room, _: &AliasesEvent) { + async fn on_room_aliases(&self, _: Arc>, _: &AliasesEvent) { self.0.lock().await.push("aliases".to_string()) } - async fn on_room_avatar(&self, _: &Room, _: &AvatarEvent) { + async fn on_room_avatar(&self, _: Arc>, _: &AvatarEvent) { self.0.lock().await.push("avatar".to_string()) } - async fn on_room_message(&self, _: &Room, _: &MessageEvent) { + async fn on_room_message(&self, _: Arc>, _: &MessageEvent) { self.0.lock().await.push("message".to_string()) } - async fn on_room_message_feedback(&self, _: &Room, _: &FeedbackEvent) { + async fn on_room_message_feedback(&self, _: Arc>, _: &FeedbackEvent) { self.0.lock().await.push("feedback".to_string()) } - async fn on_room_redaction(&self, _: &Room, _: &RedactionEvent) { + async fn on_room_redaction(&self, _: Arc>, _: &RedactionEvent) { self.0.lock().await.push("redaction".to_string()) } - async fn on_room_power_levels(&self, _: &Room, _: &PowerLevelsEvent) { + async fn on_room_power_levels(&self, _: Arc>, _: &PowerLevelsEvent) { self.0.lock().await.push("power".to_string()) } - async fn on_state_member(&self, _: &Room, _: &MemberEvent) { + async fn on_state_member(&self, _: Arc>, _: &MemberEvent) { self.0.lock().await.push("state member".to_string()) } - async fn on_state_name(&self, _: &Room, _: &NameEvent) { + async fn on_state_name(&self, _: Arc>, _: &NameEvent) { self.0.lock().await.push("state name".to_string()) } - async fn on_state_canonical_alias(&self, _: &Room, _: &CanonicalAliasEvent) { + async fn on_state_canonical_alias(&self, _: Arc>, _: &CanonicalAliasEvent) { self.0.lock().await.push("state canonical".to_string()) } - async fn on_state_aliases(&self, _: &Room, _: &AliasesEvent) { + async fn on_state_aliases(&self, _: Arc>, _: &AliasesEvent) { self.0.lock().await.push("state aliases".to_string()) } - async fn on_state_avatar(&self, _: &Room, _: &AvatarEvent) { + async fn on_state_avatar(&self, _: Arc>, _: &AvatarEvent) { self.0.lock().await.push("state avatar".to_string()) } - async fn on_state_power_levels(&self, _: &Room, _: &PowerLevelsEvent) { + async fn on_state_power_levels(&self, _: Arc>, _: &PowerLevelsEvent) { self.0.lock().await.push("state power".to_string()) } - async fn on_state_join_rules(&self, _: &Room, _: &JoinRulesEvent) { + async fn on_state_join_rules(&self, _: Arc>, _: &JoinRulesEvent) { self.0.lock().await.push("state rules".to_string()) } - async fn on_account_presence(&self, _: &Room, _: &PresenceEvent) { + async fn on_account_presence(&self, _: Arc>, _: &PresenceEvent) { self.0.lock().await.push("account presence".to_string()) } - async fn on_account_ignored_users(&self, _: &Room, _: &IgnoredUserListEvent) { + async fn on_account_ignored_users(&self, _: Arc>, _: &IgnoredUserListEvent) { self.0.lock().await.push("account ignore".to_string()) } - async fn on_account_push_rules(&self, _: &Room, _: &PushRulesEvent) { + async fn on_account_push_rules(&self, _: Arc>, _: &PushRulesEvent) { self.0.lock().await.push("".to_string()) } - async fn on_account_data_fully_read(&self, _: &Room, _: &FullyReadEvent) { + async fn on_account_data_fully_read(&self, _: Arc>, _: &FullyReadEvent) { self.0.lock().await.push("account read".to_string()) } - async fn on_presence_event(&self, _: &Room, _: &PresenceEvent) { + async fn on_presence_event(&self, _: Arc>, _: &PresenceEvent) { self.0.lock().await.push("presence event".to_string()) } } From 3f9243a3261faeddf5d0619ecfb5c36dcf4b93f4 Mon Sep 17 00:00:00 2001 From: Devin R Date: Wed, 15 Apr 2020 07:52:29 -0400 Subject: [PATCH 08/11] command-bot: add comments, use timestamp to filter old messages --- examples/command_bot.rs | 31 ++++++++++++++++++++++--------- examples/login.rs | 3 ++- src/async_client.rs | 2 ++ src/event_emitter/mod.rs | 23 ++++++++++++----------- 4 files changed, 38 insertions(+), 21 deletions(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index 56925416..2f2213e5 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -1,23 +1,31 @@ use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use std::{env, process::exit}; -use url::Url; - +use js_int::UInt; use matrix_sdk::{ self, events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, }; use tokio::sync::{Mutex, RwLock}; +use url::Url; struct CommandBot { + /// This clone of the `AsyncClient` will send requests to the server, + /// while the other keeps us in sync with the server using `sync_forever`. client: Mutex, + /// A timestamp so we only respond to messages sent after the bot is running. + start_time: UInt, } impl CommandBot { pub fn new(client: AsyncClient) -> Self { + let now = SystemTime::now(); + let timestamp = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); Self { client: Mutex::new(client), + start_time: UInt::new(timestamp as u64).unwrap(), } } } @@ -25,30 +33,34 @@ impl CommandBot { #[async_trait::async_trait] impl EventEmitter for CommandBot { async fn on_room_message(&self, room: Arc>, event: &MessageEvent) { - let msg_body = if let MessageEvent { + let (msg_body, timestamp) = if let MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), + origin_server_ts, .. } = event { - msg_body.clone() + (msg_body.clone(), *origin_server_ts) } else { - String::new() + (String::new(), UInt::min_value()) }; - if msg_body.contains("!party") { + if msg_body.contains("!party") && timestamp > self.start_time { let content = MessageEventContent::Text(TextMessageEventContent { body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(), format: None, formatted_body: None, relates_to: None, }); - let room_id = { room.read().await.room_id.clone() }; + // we clone here to hold the lock for as little time as possible. + let room_id = room.read().await.room_id.clone(); println!("sending"); self.client .lock() .await + // send our message to the room we found the "!party" command in + // the last parameter is an optional Uuid which we don't care about. .room_send(&room_id, content, None) .await .unwrap(); @@ -58,7 +70,6 @@ impl EventEmitter for CommandBot { } } -#[allow(clippy::for_loop_over_option)] async fn login_and_sync( homeserver_url: String, username: String, @@ -69,8 +80,9 @@ async fn login_and_sync( .disable_ssl_verification(); let homeserver_url = Url::parse(&homeserver_url)?; + // create a new AsyncClient with the given homeserver url and config let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); - + // add our CommandBot to be notified of incoming messages client .add_event_emitter(Box::new(CommandBot::new(client.clone()))) .await; @@ -86,6 +98,7 @@ async fn login_and_sync( println!("logged in as {}", username); + // this keeps state from the server streaming in to CommandBot via the EventEmitter trait client.sync_forever(SyncSettings::new(), |_| async {}).await; Ok(()) diff --git a/examples/login.rs b/examples/login.rs index 703fd2e0..150684c7 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -21,7 +21,8 @@ impl EventEmitter for EventCallback { } = event { let name = { - // any reads or + // any reads should be held for the shortest time possible to + // avoid dead locks let room = room.read().await; let member = room.members.get(&sender).unwrap(); member diff --git a/src/async_client.rs b/src/async_client.rs index d52b724c..86a9e139 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -53,6 +53,8 @@ const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Clone)] /// An async/await enabled Matrix client. +/// +/// All of the state is held in an `Arc` so the `AsyncClient` can be cloned freely. pub struct AsyncClient { /// The URL of the homeserver to connect to. homeserver: Url, diff --git a/src/event_emitter/mod.rs b/src/event_emitter/mod.rs index f3afd392..2d675521 100644 --- a/src/event_emitter/mod.rs +++ b/src/event_emitter/mod.rs @@ -50,28 +50,29 @@ use tokio::sync::RwLock; /// }, /// AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, /// }; -/// use tokio::sync::Mutex; +/// use tokio::sync::RwLock; /// /// struct EventCallback; /// /// #[async_trait::async_trait] /// impl EventEmitter for EventCallback { -/// async fn on_room_message(&self, room: &Room, event: &MessageEvent) { +/// async fn on_room_message(&self, room: Arc>, event: &MessageEvent) { /// if let MessageEvent { /// content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), /// sender, /// .. /// } = event /// { -/// let member = room.members.get(&sender).unwrap(); -/// println!( -/// "{}: {}", -/// member -/// .display_name -/// .as_ref() -/// .unwrap_or(&sender.to_string()), -/// msg_body -/// ); +/// let name = { +/// let room = room.read().await; +/// let member = room.members.get(&sender).unwrap(); +/// member +/// .display_name +/// .as_ref() +/// .map(ToString::to_string) +/// .unwrap_or(sender.to_string()) +/// }; +/// println!("{}: {}", name, msg_body); /// } /// } /// } From c495a50c52c95a9545630153848dc5513cd3159f Mon Sep 17 00:00:00 2001 From: Devin R Date: Wed, 15 Apr 2020 08:29:34 -0400 Subject: [PATCH 09/11] async_client: make pub API take &self instead of &mut, use read where possible --- examples/command_bot.rs | 20 +++++++++++--------- src/async_client.rs | 29 +++++++++++++---------------- src/base_client.rs | 2 +- src/models/room.rs | 2 +- src/request_builder.rs | 4 ++-- tests/async_client_tests.rs | 6 +++--- 6 files changed, 31 insertions(+), 32 deletions(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index 2f2213e5..20bbf7a1 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -8,13 +8,13 @@ use matrix_sdk::{ events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, }; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use url::Url; struct CommandBot { /// This clone of the `AsyncClient` will send requests to the server, /// while the other keeps us in sync with the server using `sync_forever`. - client: Mutex, + client: AsyncClient, /// A timestamp so we only respond to messages sent after the bot is running. start_time: UInt, } @@ -24,7 +24,7 @@ impl CommandBot { let now = SystemTime::now(); let timestamp = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); Self { - client: Mutex::new(client), + client, start_time: UInt::new(timestamp as u64).unwrap(), } } @@ -57,8 +57,6 @@ impl EventEmitter for CommandBot { println!("sending"); self.client - .lock() - .await // send our message to the room we found the "!party" command in // the last parameter is an optional Uuid which we don't care about. .room_send(&room_id, content, None) @@ -82,10 +80,6 @@ async fn login_and_sync( let homeserver_url = Url::parse(&homeserver_url)?; // create a new AsyncClient with the given homeserver url and config let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); - // add our CommandBot to be notified of incoming messages - client - .add_event_emitter(Box::new(CommandBot::new(client.clone()))) - .await; client .login( @@ -98,6 +92,14 @@ async fn login_and_sync( println!("logged in as {}", username); + // initial sync to set up state and so our bot doesn't respond to old messages + client.sync(SyncSettings::default()).await.unwrap(); + // add our CommandBot to be notified of incoming messages, we do this after the initial + // sync to avoid responding to messages before the bot was running. + client + .add_event_emitter(Box::new(CommandBot::new(client.clone()))) + .await; + // this keeps state from the server streaming in to CommandBot via the EventEmitter trait client.sync_forever(SyncSettings::new(), |_| async {}).await; diff --git a/src/async_client.rs b/src/async_client.rs index 86a9e139..5e298a8b 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -318,7 +318,7 @@ impl AsyncClient { /// only if the client also holds the encryption keys for this device. #[instrument(skip(password))] pub async fn login + std::fmt::Debug>( - &mut self, + &self, user: S, password: S, device_id: Option, @@ -368,7 +368,7 @@ impl AsyncClient { /// * alias - The `RoomId` or `RoomAliasId` of the room to be joined. /// An alias looks like this `#name:example.com` pub async fn join_room_by_id_or_alias( - &mut self, + &self, alias: &RoomIdOrAliasId, ) -> Result { let request = join_room_by_id_or_alias::Request { @@ -390,7 +390,7 @@ impl AsyncClient { /// /// * reason - Optional reason why the room member is being kicked out. pub async fn kick_user( - &mut self, + &self, room_id: &RoomId, user_id: &UserId, reason: Option, @@ -411,7 +411,7 @@ impl AsyncClient { /// /// * room_id - The `RoomId` of the room to leave. /// - pub async fn leave_room(&mut self, room_id: &RoomId) -> Result { + pub async fn leave_room(&self, room_id: &RoomId) -> Result { let request = leave_room::Request { room_id: room_id.clone(), }; @@ -428,7 +428,7 @@ impl AsyncClient { /// /// * user_id - The `UserId` of the user to invite to the room. pub async fn invite_user_by_id( - &mut self, + &self, room_id: &RoomId, user_id: &UserId, ) -> Result { @@ -451,7 +451,7 @@ impl AsyncClient { /// /// * invite_id - A third party id of a user to invite to the room. pub async fn invite_user_by_3pid( - &mut self, + &self, room_id: &RoomId, invite_id: &Invite3pid, ) -> Result { @@ -492,7 +492,7 @@ impl AsyncClient { /// # }); /// ``` pub async fn create_room>( - &mut self, + &self, room: R, ) -> Result { let request = room.into(); @@ -536,7 +536,7 @@ impl AsyncClient { /// # }); /// ``` pub async fn room_messages>( - &mut self, + &self, request: R, ) -> Result { let req = request.into(); @@ -549,10 +549,7 @@ impl AsyncClient { /// /// * `sync_settings` - Settings for the sync call. #[instrument] - pub async fn sync( - &mut self, - sync_settings: SyncSettings, - ) -> Result { + pub async fn sync(&self, sync_settings: SyncSettings) -> Result { let request = sync_events::Request { filter: None, since: sync_settings.token, @@ -701,7 +698,7 @@ impl AsyncClient { /// ``` #[instrument(skip(callback))] pub async fn sync_forever( - &mut self, + &self, sync_settings: SyncSettings, callback: impl Fn(sync_events::IncomingResponse) -> C + Send, ) where @@ -879,7 +876,7 @@ impl AsyncClient { /// }) /// ``` pub async fn room_send( - &mut self, + &self, room_id: &RoomId, #[allow(unused_mut)] mut content: MessageEventContent, txn_id: Option, @@ -894,7 +891,7 @@ impl AsyncClient { let room = client.joined_rooms.get(room_id); match room { - Some(r) => r.write().await.is_encrypted(), + Some(r) => r.read().await.is_encrypted(), None => false, } }; @@ -903,7 +900,7 @@ impl AsyncClient { let missing_sessions = { let client = self.base_client.read().await; let room = client.joined_rooms.get(room_id); - let room = room.as_ref().unwrap().write().await; + let room = room.as_ref().unwrap().read().await; let users = room.members.keys(); self.base_client .read() diff --git a/src/base_client.rs b/src/base_client.rs index 686f8c15..33a0314f 100644 --- a/src/base_client.rs +++ b/src/base_client.rs @@ -811,7 +811,7 @@ mod test { .with_body_from_file("tests/data/sync.json") .create(); - let mut client = AsyncClient::new(homeserver, Some(session)).unwrap(); + let client = AsyncClient::new(homeserver, Some(session)).unwrap(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); diff --git a/src/models/room.rs b/src/models/room.rs index b0f1b735..9701d04a 100644 --- a/src/models/room.rs +++ b/src/models/room.rs @@ -417,7 +417,7 @@ mod test { .with_body_from_file("tests/data/sync.json") .create(); - let mut client = AsyncClient::new(homeserver, Some(session)).unwrap(); + let client = AsyncClient::new(homeserver, Some(session)).unwrap(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); diff --git a/src/request_builder.rs b/src/request_builder.rs index 42f088f5..795e8534 100644 --- a/src/request_builder.rs +++ b/src/request_builder.rs @@ -341,7 +341,7 @@ mod test { .room_alias_name("room_alias") .topic("room topic") .visibility(Visibility::Private); - let mut cli = AsyncClient::new(homeserver, Some(session)).unwrap(); + let cli = AsyncClient::new(homeserver, Some(session)).unwrap(); assert!(cli.create_room(builder).await.is_ok()); } @@ -373,7 +373,7 @@ mod test { // TODO this makes ruma error `Err(IntoHttp(IntoHttpError(Query(Custom("unsupported value")))))`?? // .filter(RoomEventFilter::default()); - let mut cli = AsyncClient::new(homeserver, Some(session)).unwrap(); + let cli = AsyncClient::new(homeserver, Some(session)).unwrap(); assert!(cli.room_messages(builder).await.is_ok()); } } diff --git a/tests/async_client_tests.rs b/tests/async_client_tests.rs index b2e72ff7..3c9dacdc 100644 --- a/tests/async_client_tests.rs +++ b/tests/async_client_tests.rs @@ -17,7 +17,7 @@ async fn login() { .with_body_from_file("tests/data/login_response.json") .create(); - let mut client = AsyncClient::new(homeserver, None).unwrap(); + let client = AsyncClient::new(homeserver, None).unwrap(); client .login("example", "wordpass", None, None) @@ -46,7 +46,7 @@ async fn sync() { .with_body_from_file("tests/data/sync.json") .create(); - let mut client = AsyncClient::new(homeserver, Some(session)).unwrap(); + let client = AsyncClient::new(homeserver, Some(session)).unwrap(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); @@ -75,7 +75,7 @@ async fn room_names() { .with_body_from_file("tests/data/sync.json") .create(); - let mut client = AsyncClient::new(homeserver, Some(session)).unwrap(); + let client = AsyncClient::new(homeserver, Some(session)).unwrap(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); From 63368a9437a4b1ec6cc8276678ddfb2258590ab1 Mon Sep 17 00:00:00 2001 From: Devin R Date: Wed, 15 Apr 2020 08:44:29 -0400 Subject: [PATCH 10/11] command_bot: remove timestamp --- examples/command_bot.rs | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index 20bbf7a1..effd2e7e 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -1,8 +1,6 @@ use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use std::{env, process::exit}; -use js_int::UInt; use matrix_sdk::{ self, events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, @@ -15,36 +13,28 @@ struct CommandBot { /// This clone of the `AsyncClient` will send requests to the server, /// while the other keeps us in sync with the server using `sync_forever`. client: AsyncClient, - /// A timestamp so we only respond to messages sent after the bot is running. - start_time: UInt, } impl CommandBot { pub fn new(client: AsyncClient) -> Self { - let now = SystemTime::now(); - let timestamp = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); - Self { - client, - start_time: UInt::new(timestamp as u64).unwrap(), - } + Self { client } } } #[async_trait::async_trait] impl EventEmitter for CommandBot { async fn on_room_message(&self, room: Arc>, event: &MessageEvent) { - let (msg_body, timestamp) = if let MessageEvent { + let msg_body = if let MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), - origin_server_ts, .. } = event { - (msg_body.clone(), *origin_server_ts) + msg_body.clone() } else { - (String::new(), UInt::min_value()) + String::new() }; - if msg_body.contains("!party") && timestamp > self.start_time { + if msg_body.contains("!party") { let content = MessageEventContent::Text(TextMessageEventContent { body: "🎉🎊🥳 let's PARTY!! 🥳🎊🎉".to_string(), format: None, From 9b52b58fea777426e5a8259eaf5ec664282e0c5c Mon Sep 17 00:00:00 2001 From: Devin R Date: Wed, 15 Apr 2020 09:54:53 -0400 Subject: [PATCH 11/11] command_bot: pass sync_token to sync_forever --- examples/command_bot.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/command_bot.rs b/examples/command_bot.rs index effd2e7e..220b0ea1 100644 --- a/examples/command_bot.rs +++ b/examples/command_bot.rs @@ -90,8 +90,11 @@ async fn login_and_sync( .add_event_emitter(Box::new(CommandBot::new(client.clone()))) .await; + // since we called sync before we `sync_forever` we must pass that sync token to + // `sync_forever` + let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); // this keeps state from the server streaming in to CommandBot via the EventEmitter trait - client.sync_forever(SyncSettings::new(), |_| async {}).await; + client.sync_forever(settings, |_| async {}).await; Ok(()) }