From 56128257623594181599e2c1e3de4d78f6bf33e9 Mon Sep 17 00:00:00 2001 From: Devin R Date: Tue, 31 Mar 2020 19:34:11 -0400 Subject: [PATCH] add EventEmitter trait, clean up imports so new warnings are easier to see --- Cargo.toml | 8 +- examples/login.rs | 54 +++---- src/async_client.rs | 247 ++++++-------------------------- src/base_client.rs | 275 ++++++++++++++++++++++++++++++++++-- src/event_emitter/mod.rs | 76 ++++++++++ src/lib.rs | 4 + src/models/mod.rs | 1 + src/models/room.rs | 14 +- src/models/room_member.rs | 19 +-- src/models/user.rs | 17 +-- tests/async_client_tests.rs | 1 - 11 files changed, 425 insertions(+), 291 deletions(-) create mode 100644 src/event_emitter/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 6a3b516c..6896942c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ version = "0.1.0" [features] default = [] -encryption = ["olm-rs", "serde/derive", "serde_json", "cjson", "async-trait"] +encryption = ["olm-rs", "serde/derive", "serde_json", "cjson"] sqlite-cryptostore = ["sqlx", "zeroize"] [dependencies] @@ -20,11 +20,13 @@ futures = "0.3.4" reqwest = "0.10.4" http = "0.2.1" url = "2.1.1" +async-trait = "0.1.26" + # Ruma dependencies js_int = "0.1.3" ruma-api = "0.15.0-dev.1" -ruma-client-api = { version = "0.6.0", git = "https://github.com/matrix-org/ruma-client-api/" } +ruma-client-api = { git = "https://github.com/matrix-org/ruma-client-api/", version = "0.6.0" } ruma-events = { git = "https://github.com/matrix-org/ruma-events", version = "0.17.0" } ruma-identifiers = "0.14.1" @@ -37,7 +39,6 @@ zeroize = { version = "1.1.0", optional = true } # Misc dependencies thiserror = "1.0.13" -async-trait = { version = "0.1.26", optional = true } tracing = "0.1.13" [dependencies.tracing-futures] @@ -61,3 +62,4 @@ tokio = { version = "0.2.13", features = ["rt-threaded", "macros"] } tracing-subscriber = "0.2.3" tempfile = "3.1.0" mockito = "0.23.3" + diff --git a/examples/login.rs b/examples/login.rs index 561795e6..5c48088c 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex}; use std::{env, process::exit}; use url::Url; @@ -7,34 +7,32 @@ use matrix_sdk::{ events::{ collections::all::RoomEvent, room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, - EventResult, }, - AsyncClient, AsyncClientConfig, Room, SyncSettings, + AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, }; -async fn async_cb(room: Arc>, event: Arc>) { - let room = room.read().unwrap(); - let event = if let EventResult::Ok(event) = &*event { - event - } else { - return; - }; - if let RoomEvent::RoomMessage(MessageEvent { - content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), - sender, - .. - }) = event - { - let member = room.members.get(&sender.to_string()).unwrap(); - println!( - "{}: {}", - member - .user - .display_name - .as_ref() - .unwrap_or(&sender.to_string()), - msg_body - ); +struct EventCallback; + +#[async_trait::async_trait] +impl EventEmitter for EventCallback { + async fn on_room_message(&mut self, room: &Room, event: &RoomEvent) { + if let RoomEvent::RoomMessage(MessageEvent { + content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), + sender, + .. + }) = event + { + let member = room.members.get(&sender.to_string()).unwrap(); + println!( + "{}: {}", + member + .user + .display_name + .as_ref() + .unwrap_or(&sender.to_string()), + msg_body + ); + } } } @@ -49,7 +47,9 @@ async fn login( let homeserver_url = Url::parse(&homeserver_url)?; let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); - client.add_event_callback(async_cb); + client + .add_event_emitter(Arc::new(Mutex::new(EventCallback))) + .await; client .login(username, password, None, Some("rust-sdk".to_string())) diff --git a/src/async_client.rs b/src/async_client.rs index 48aac0b0..ba88bd6d 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -17,13 +17,13 @@ use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::result::Result as StdResult; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex, RwLock as SyncLock}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use futures::future::{BoxFuture, Future, FutureExt}; use tokio::sync::RwLock; use tokio::time::delay_for as sleep; -use tracing::{debug, info, instrument, trace}; +use tracing::{info, instrument, trace}; use http::Method as HttpMethod; use http::Response as HttpResponse; @@ -31,8 +31,6 @@ use reqwest::header::{HeaderValue, InvalidHeaderValue}; use url::Url; use ruma_api::{Endpoint, Outgoing}; -use ruma_events::collections::all::RoomEvent; -use ruma_events::presence::PresenceEvent; use ruma_events::room::message::MessageEventContent; use ruma_events::EventResult; pub use ruma_events::EventType; @@ -43,19 +41,9 @@ use ruma_identifiers::{DeviceId, UserId}; use crate::api; use crate::base_client::Client as BaseClient; -use crate::models::Room; use crate::session::Session; use crate::VERSION; -use crate::{Error, Result}; - -type RoomEventCallback = Box< - dyn FnMut(Arc>, Arc>) -> BoxFuture<'static, ()> + Send, ->; - -type PresenceEventCallback = Box< - dyn FnMut(Arc>, Arc>) -> BoxFuture<'static, ()> - + Send, ->; +use crate::{Error, EventEmitter, Result}; const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); @@ -70,9 +58,6 @@ pub struct AsyncClient { pub(crate) base_client: Arc>, /// The transaction id. transaction_id: Arc, - /// Event callbacks - event_callbacks: Arc>>, - presence_callbacks: Arc>>, } impl std::fmt::Debug for AsyncClient { @@ -224,6 +209,7 @@ impl AsyncClient { session: Option, config: AsyncClientConfig, ) -> Result { + #[allow(clippy::match_wild_err_arm)] let homeserver: Url = match homeserver_url.try_into() { Ok(u) => u, Err(_e) => panic!("Error parsing homeserver url"), @@ -258,8 +244,6 @@ impl AsyncClient { http_client, base_client: Arc::new(RwLock::new(BaseClient::new(session)?)), transaction_id: Arc::new(AtomicU64::new(0)), - event_callbacks: Arc::new(Mutex::new(Vec::new())), - presence_callbacks: Arc::new(Mutex::new(Vec::new())), }) } @@ -275,156 +259,28 @@ impl AsyncClient { &self.homeserver } - /// Calculate the room name from a `RoomId`, returning a string. + /// Add `EventEmitter` to `AsyncClient`. + /// + /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. + pub async fn add_event_emitter(&mut self, emitter: Arc>>) { + self.base_client.write().await.event_emitter = Some(emitter); + } + + /// Calculates the room name from a `RoomId`, returning a string. pub async fn get_room_name(&self, room_id: &str) -> Option { self.base_client.read().await.calculate_room_name(room_id) } - /// Calculate the room names this client knows about. + /// Calculates the room names this client knows about. pub async fn get_room_names(&self) -> Vec { self.base_client.read().await.calculate_room_names() } - /// Calculate the room names this client knows about. + /// Calculates the room that the client last interacted with. pub async fn current_room_id(&self) -> Option { self.base_client.read().await.current_room_id() } - /// Add a callback that will be called every time the client receives a room - /// event - /// - /// # Arguments - /// - /// * `callback` - The callback that should be called once a RoomEvent is - /// received. - /// - /// # Examples - /// ``` - /// # use matrix_sdk::events::{ - /// # collections::all::RoomEvent, - /// # room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, - /// # EventResult, - /// # }; - /// # use matrix_sdk::Room; - /// # use std::sync::{Arc, RwLock}; - /// # use matrix_sdk::AsyncClient; - /// # use url::Url; - /// - /// async fn async_cb(room: Arc>, event: Arc>) { - /// let room = room.read().unwrap(); - /// let event = if let EventResult::Ok(event) = &*event { - /// event - /// } else { - /// return; - /// }; - /// if let RoomEvent::RoomMessage(MessageEvent { - /// content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), - /// sender, - /// .. - /// }) = event - /// { - /// let member = room.members.get(&sender.to_string()).unwrap(); - /// println!( - /// "{}: {}", - /// member.user.display_name.as_ref().unwrap_or(&sender.to_string()), - /// msg_body - /// ); - /// } - /// } - /// # fn main() -> Result<(), matrix_sdk::Error> { - /// let homeserver = Url::parse("http://localhost:8080")?; - /// - /// let mut client = AsyncClient::new(homeserver, None)?; - /// - /// client.add_event_callback(async_cb); - /// # Ok(()) - /// # } - /// ``` - pub fn add_event_callback( - &mut self, - mut callback: impl FnMut(Arc>, Arc>) -> C + 'static + Send, - ) where - C: Future + Send, - { - let mut futures = self.event_callbacks.lock().unwrap(); - - let future = move |room, event| callback(room, event).boxed(); - - futures.push(Box::new(future)); - } - - /// Add a callback that will be called every time the client receives a presence - /// event - /// - /// # Arguments - /// - /// * `callback` - The callback that should be called once a RoomEvent is - /// received. - /// - /// # Examples - /// ``` - /// # use matrix_sdk::events::{ - /// # collections::all::RoomEvent, - /// # room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, - /// # presence::{PresenceEvent, PresenceEventContent}, - /// # EventResult, - /// # }; - /// # use matrix_sdk::Room; - /// # use std::sync::{Arc, RwLock}; - /// # use matrix_sdk::AsyncClient; - /// # use url::Url; - /// - /// async fn async_cb(room: Arc>, event: Arc>) { - /// let room = room.read().unwrap(); - /// let event = if let EventResult::Ok(event) = &*event { - /// event - /// } else { - /// return; - /// }; - /// let PresenceEvent { - /// content: PresenceEventContent { - /// avatar_url, - /// currently_active, - /// displayname, - /// last_active_ago, - /// presence, - /// status_msg, - /// }, - /// sender, - /// } = event; - /// { - /// let member = room.members.get(&sender.to_string()).unwrap(); - /// println!( - /// "{} is {}", - /// displayname.as_deref().unwrap_or(&sender.to_string()), - /// status_msg.as_deref().unwrap_or("not here") - /// ); - /// } - /// } - /// # fn main() -> Result<(), matrix_sdk::Error> { - /// let homeserver = Url::parse("http://localhost:8080")?; - /// - /// let mut client = AsyncClient::new(homeserver, None)?; - /// - /// client.add_presence_callback(async_cb); - /// # Ok(()) - /// # } - /// ``` - pub fn add_presence_callback( - &mut self, - mut callback: impl FnMut(Arc>, Arc>) -> C - + 'static - + Send, - ) where - C: Future + Send, - { - let mut futures = self.presence_callbacks.lock().unwrap(); - - let future = move |room, event| callback(room, event).boxed(); - - futures.push(Box::new(future)); - } - /// Login to the server. /// /// # Arguments @@ -486,9 +342,9 @@ impl AsyncClient { for (room_id, room) in &mut response.rooms.join { let room_id_string = room_id.to_string(); - let matrix_room = { - let mut client = self.base_client.write().await; + let mut client = self.base_client.write().await; + let _matrix_room = { for event in &room.state.events { if let EventResult::Ok(e) = event { client.receive_joined_state_event(&room_id_string, &e); @@ -498,9 +354,16 @@ impl AsyncClient { client.get_or_create_room(&room_id_string).clone() }; + // TODO should we determine if anything room state has changed before calling + // re looping is not ideal here + for event in &mut room.state.events { + if let EventResult::Ok(e) = event { + client.emit_state_event(room_id, e).await; + } + } + for mut event in &mut room.timeline.events { let decrypted_event = { - let mut client = self.base_client.write().await; client .receive_joined_timeline_event(room_id, &mut event) .await @@ -510,35 +373,21 @@ impl AsyncClient { *event = e; } - let callbacks = { - let mut cb_futures = self.event_callbacks.lock().unwrap(); - - let event = if !cb_futures.is_empty() { - Arc::new(event.clone()) - } else { - continue; - }; - - let mut callbacks = Vec::new(); - - for cb in &mut cb_futures.iter_mut() { - callbacks.push(cb(matrix_room.clone(), Arc::clone(&event))); - } - - callbacks - }; - - for cb in callbacks { - cb.await; + // TODO should we determine if any room state has changed before calling + if let EventResult::Ok(e) = event { + client.emit_timeline_event(room_id, e).await; } } // look at AccountData to further cut down users by collecting ignored users - // TODO actually use the ignored users - for account_data in &room.account_data.events { - let mut client = self.base_client.write().await; - if let EventResult::Ok(e) = account_data { - client.receive_account_data(&room_id_string, e); + for account_data in &mut room.account_data.events { + { + if let EventResult::Ok(e) = account_data { + client.receive_account_data(&room_id_string, e); + + // TODO should we determine if anything room state has changed before calling + client.emit_account_data_event(room_id, e).await; + } } } @@ -547,28 +396,14 @@ impl AsyncClient { // After the room has been created and state/timeline events accounted for we use the room_id of the newly created // room to add any presence events that relate to a user in the current room. This is not super // efficient but we need a room_id so we would loop through now or later. - for presence in &response.presence.events { - let mut client = self.base_client.write().await; - if let EventResult::Ok(e) = presence { - client.receive_presence_event(&room_id_string, e); - } + for presence in &mut response.presence.events { + { + if let EventResult::Ok(e) = presence { + client.receive_presence_event(&room_id_string, e); - let callbacks = { - let mut cb_futures = self.presence_callbacks.lock().unwrap(); - let event = if !cb_futures.is_empty() { - Arc::new(presence.clone()) - } else { - continue; - }; - let mut callbacks = Vec::new(); - for cb in &mut cb_futures.iter_mut() { - callbacks.push(cb(matrix_room.clone(), Arc::clone(&event))); + // TODO should we determine if any room state has changed before calling + client.emit_presence_event(room_id, e).await; } - - callbacks - }; - for cb in callbacks { - cb.await; } } } diff --git a/src/base_client.rs b/src/base_client.rs index 1572356e..e3cfb2fa 100644 --- a/src/base_client.rs +++ b/src/base_client.rs @@ -15,6 +15,8 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; +use std::fmt; +use std::sync::{Arc, RwLock}; #[cfg(feature = "encryption")] use std::result::Result as StdResult; @@ -27,21 +29,13 @@ use crate::events::presence::PresenceEvent; use crate::events::collections::only::Event as NonRoomEvent; use crate::events::ignored_user_list::IgnoredUserListEvent; use crate::events::push_rules::{PushRulesEvent, Ruleset}; -use crate::events::room::{ - aliases::AliasesEvent, - canonical_alias::CanonicalAliasEvent, - member::{MemberEvent, MembershipState}, - name::NameEvent, -}; use crate::events::EventResult; use crate::identifiers::{RoomAliasId, UserId as Uid}; use crate::models::Room; use crate::session::Session; -use std::sync::{Arc, RwLock}; +use crate::EventEmitter; use js_int::UInt; - -#[cfg(feature = "encryption")] use tokio::sync::Mutex; #[cfg(feature = "encryption")] @@ -94,7 +88,6 @@ impl CurrentRoom { } } -#[derive(Debug)] /// A no IO Client implementation. /// /// This Client is a state machine that receives responses and events and @@ -113,11 +106,28 @@ pub struct Client { pub ignored_users: Vec, /// The push ruleset for the logged in user. pub push_ruleset: Option, + /// Any implementor of EventEmitter will act as the callbacks for various + /// events. + pub event_emitter: Option>>>, #[cfg(feature = "encryption")] olm: Arc>>, } +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Client") + .field("session", &self.session) + .field("sync_token", &self.sync_token) + .field("joined_rooms", &self.joined_rooms) + .field("current_room_id", &self.current_room_id) + .field("ignored_users", &self.ignored_users) + .field("push_ruleset", &self.push_ruleset) + .field("event_emitter", &"EventEmitter<...>") + .finish() + } +} + impl Client { /// Create a new client. /// @@ -139,6 +149,7 @@ impl Client { current_room_id: CurrentRoom::default(), ignored_users: Vec::new(), push_ruleset: None, + event_emitter: None, #[cfg(feature = "encryption")] olm: Arc::new(Mutex::new(olm)), }) @@ -213,8 +224,8 @@ impl Client { )))) } - pub(crate) fn get_room(&mut self, room_id: &str) -> Option<&mut Arc>> { - self.joined_rooms.get_mut(room_id) + pub(crate) fn get_room(&self, room_id: &str) -> Option<&Arc>> { + self.joined_rooms.get(room_id) } /// Handle a m.ignored_user_list event, updating the room state if necessary. @@ -489,18 +500,252 @@ impl Client { // TODO notify our callers of new devices via some callback. Ok(()) } + + pub(crate) async fn emit_timeline_event(&mut self, room_id: &RoomId, event: &mut RoomEvent) { + match event { + RoomEvent::RoomMember(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_member(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomName(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_name(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomCanonicalAlias(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_canonical_alias(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomAliases(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_aliases(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomAvatar(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_avatar(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomMessage(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_message(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomMessageFeedback(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_message_feedback(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomRedaction(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_redaction(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + RoomEvent::RoomPowerLevels(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_room_power_levels(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + _ => {} + } + } + + pub(crate) async fn emit_state_event(&mut self, room_id: &RoomId, event: &mut StateEvent) { + match event { + StateEvent::RoomMember(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_state_member(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + StateEvent::RoomName(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_state_name(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + StateEvent::RoomCanonicalAlias(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_state_canonical_alias(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + StateEvent::RoomAliases(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_state_aliases(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + StateEvent::RoomAvatar(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_state_avatar(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + StateEvent::RoomPowerLevels(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_state_power_levels(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + StateEvent::RoomJoinRules(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_state_join_rules(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + _ => {} + } + } + + pub(crate) async fn emit_account_data_event( + &mut self, + room_id: &RoomId, + event: &mut NonRoomEvent, + ) { + match event { + NonRoomEvent::Presence(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_account_presence(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + NonRoomEvent::IgnoredUserList(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_account_ignored_users(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + NonRoomEvent::PushRules(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_account_push_rules(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + NonRoomEvent::FullyRead(_) => { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_account_data_fully_read(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } + _ => {} + } + } + + pub(crate) async fn emit_presence_event( + &mut self, + room_id: &RoomId, + event: &mut PresenceEvent, + ) { + if let Some(ee) = &self.event_emitter { + if let Some(room) = self.get_room(&room_id.to_string()) { + ee.lock() + .await + .on_presence_event(Arc::clone(&room), Arc::new(Mutex::new(event.clone()))) + .await; + } + } + } } #[cfg(test)] mod test { - use super::*; - use crate::events::room::member::MembershipState; use crate::identifiers::UserId; use crate::{AsyncClient, Session, SyncSettings}; use mockito::{mock, Matcher}; - use tokio::runtime::Runtime; use url::Url; use std::convert::TryFrom; diff --git a/src/event_emitter/mod.rs b/src/event_emitter/mod.rs new file mode 100644 index 00000000..fd5764a8 --- /dev/null +++ b/src/event_emitter/mod.rs @@ -0,0 +1,76 @@ +// Copyright 2020 Damir Jelić +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, RwLock}; + +use crate::events::collections::all::{RoomEvent, StateEvent}; +use crate::events::collections::only::Event as NonRoomEvent; +use crate::events::presence::PresenceEvent; +use crate::models::Room; + +use tokio::sync::Mutex; +/// +#[async_trait::async_trait] +pub trait EventEmitter: Send + Sync { + // ROOM EVENTS from `IncomingTimeline` + /// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event. + async fn on_room_member(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event. + async fn on_room_name(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event. + async fn on_room_canonical_alias(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event. + async fn on_room_aliases(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event. + async fn on_room_avatar(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event. + async fn on_room_message(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event. + async fn on_room_message_feedback(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event. + async fn on_room_redaction(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event. + async fn on_room_power_levels(&mut self, _: Arc>, _: Arc>) {} + + // `RoomEvent`s from `IncomingState` + /// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event. + async fn on_state_member(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `StateEvent::RoomName` event. + async fn on_state_name(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event. + async fn on_state_canonical_alias(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event. + async fn on_state_aliases(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event. + async fn on_state_avatar(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event. + async fn on_state_power_levels(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event. + async fn on_state_join_rules(&mut self, _: Arc>, _: Arc>) {} + + // `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData` + /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event. + async fn on_account_presence(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event. + async fn on_account_ignored_users(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event. + async fn on_account_push_rules(&mut self, _: Arc>, _: Arc>) {} + /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. + async fn on_account_data_fully_read(&mut self, _: Arc>, _: Arc>) {} + + // `PresenceEvent` is a struct so there is only the one method + /// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event. + async fn on_presence_event(&mut self, _: Arc>, _: Arc>) {} +} diff --git a/src/lib.rs b/src/lib.rs index c8813589..69b29c1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,9 +32,12 @@ pub use ruma_client_api as api; pub use ruma_events as events; pub use ruma_identifiers as identifiers; +pub use ruma_api as ruma_traits; + mod async_client; mod base_client; mod error; +mod event_emitter; mod models; mod session; @@ -43,6 +46,7 @@ mod crypto; pub use async_client::{AsyncClient, AsyncClientConfig, SyncSettings}; pub use base_client::Client; +pub use event_emitter::EventEmitter; pub use models::Room; pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/src/models/mod.rs b/src/models/mod.rs index 8fbc08d4..7dc4a330 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -6,5 +6,6 @@ pub use room::{Room, RoomName}; pub use room_member::RoomMember; pub use user::User; +#[allow(dead_code)] pub type Token = String; pub type UserId = String; diff --git a/src/models/room.rs b/src/models/room.rs index 6b983b7f..d8355b4d 100644 --- a/src/models/room.rs +++ b/src/models/room.rs @@ -14,11 +14,11 @@ // limitations under the License. use std::collections::HashMap; -use std::sync::{Arc, RwLock}; -use super::{RoomMember, User, UserId}; +use super::{RoomMember, UserId}; use crate::events::collections::all::{RoomEvent, StateEvent}; +use crate::events::presence::PresenceEvent; use crate::events::room::{ aliases::AliasesEvent, canonical_alias::CanonicalAliasEvent, @@ -27,12 +27,7 @@ use crate::events::room::{ name::NameEvent, power_levels::PowerLevelsEvent, }; -use crate::events::{ - presence::{PresenceEvent, PresenceEventContent}, - EventResult, -}; use crate::identifiers::RoomAliasId; -use crate::session::Session; use js_int::UInt; @@ -317,14 +312,11 @@ impl Room { #[cfg(test)] mod test { - use super::*; - use crate::events::room::member::MembershipState; use crate::identifiers::UserId; use crate::{AsyncClient, Session, SyncSettings}; use mockito::{mock, Matcher}; - use tokio::runtime::Runtime; use url::Url; use std::convert::TryFrom; @@ -363,7 +355,7 @@ mod test { .unwrap(); assert_eq!(2, room.members.len()); - for (id, member) in &room.members { + for (_id, member) in &room.members { assert_eq!(MembershipState::Join, member.membership); } } diff --git a/src/models/room_member.rs b/src/models/room_member.rs index 91c2b71d..6330f03a 100644 --- a/src/models/room_member.rs +++ b/src/models/room_member.rs @@ -16,20 +16,14 @@ use std::convert::TryFrom; use super::User; -use crate::api::r0 as api; -use crate::events::collections::all::{Event, RoomEvent, StateEvent}; +use crate::events::collections::all::Event; use crate::events::room::{ - aliases::AliasesEvent, - canonical_alias::CanonicalAliasEvent, - member::{MemberEvent, MemberEventContent, MembershipChange, MembershipState}, - name::NameEvent, + member::{MemberEvent, MembershipChange, MembershipState}, power_levels::PowerLevelsEvent, }; -use crate::events::EventResult; -use crate::identifiers::{RoomAliasId, UserId}; -use crate::session::Session; +use crate::identifiers::UserId; -use js_int::{Int, UInt}; +use js_int::Int; #[cfg(feature = "encryption")] use tokio::sync::Mutex; @@ -108,7 +102,7 @@ impl RoomMember { max_power = *power.max(&max_power); } - let mut changed = false; + let changed; if let Some(user_power) = event.content.users.get(&self.user_id) { changed = self.power_level != Some(*user_power); self.power_level = Some(*user_power); @@ -134,7 +128,6 @@ mod test { use js_int::{Int, UInt}; use mockito::{mock, Matcher}; - use tokio::runtime::Runtime; use url::Url; use std::collections::HashMap; @@ -177,7 +170,7 @@ mod test { .write() .unwrap(); - for (id, member) in &mut room.members { + for (_id, member) in &mut room.members { let power = power_levels(); assert!(member.update_power(&power)); assert_eq!(MembershipState::Join, member.membership); diff --git a/src/models/user.rs b/src/models/user.rs index 37c123a1..93221235 100644 --- a/src/models/user.rs +++ b/src/models/user.rs @@ -13,22 +13,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - -use super::UserId; -use crate::api::r0 as api; -use crate::events::collections::all::{Event, RoomEvent, StateEvent}; +use crate::events::collections::all::Event; use crate::events::presence::{PresenceEvent, PresenceEventContent, PresenceState}; -use crate::events::room::{ - aliases::AliasesEvent, - canonical_alias::CanonicalAliasEvent, - member::{MemberEvent, MembershipState}, - name::NameEvent, -}; -use crate::events::EventResult; -use crate::identifiers::RoomAliasId; -use crate::session::Session; +use crate::events::room::member::MemberEvent; use js_int::UInt; #[cfg(feature = "encryption")] diff --git a/tests/async_client_tests.rs b/tests/async_client_tests.rs index 5403eea7..f152c82e 100644 --- a/tests/async_client_tests.rs +++ b/tests/async_client_tests.rs @@ -2,7 +2,6 @@ use matrix_sdk::identifiers::UserId; use matrix_sdk::{AsyncClient, Session, SyncSettings}; use mockito::{mock, Matcher}; -use tokio::runtime::Runtime; use url::Url; use std::convert::TryFrom;