add EventEmitter trait, clean up imports so new warnings are easier to see

master
Devin R 2020-03-31 19:34:11 -04:00
parent 5e22eb9faf
commit 5612825762
11 changed files with 425 additions and 291 deletions

View File

@ -12,7 +12,7 @@ version = "0.1.0"
[features] [features]
default = [] default = []
encryption = ["olm-rs", "serde/derive", "serde_json", "cjson", "async-trait"] encryption = ["olm-rs", "serde/derive", "serde_json", "cjson"]
sqlite-cryptostore = ["sqlx", "zeroize"] sqlite-cryptostore = ["sqlx", "zeroize"]
[dependencies] [dependencies]
@ -20,11 +20,13 @@ futures = "0.3.4"
reqwest = "0.10.4" reqwest = "0.10.4"
http = "0.2.1" http = "0.2.1"
url = "2.1.1" url = "2.1.1"
async-trait = "0.1.26"
# Ruma dependencies # Ruma dependencies
js_int = "0.1.3" js_int = "0.1.3"
ruma-api = "0.15.0-dev.1" ruma-api = "0.15.0-dev.1"
ruma-client-api = { version = "0.6.0", git = "https://github.com/matrix-org/ruma-client-api/" } ruma-client-api = { git = "https://github.com/matrix-org/ruma-client-api/", version = "0.6.0" }
ruma-events = { git = "https://github.com/matrix-org/ruma-events", version = "0.17.0" } ruma-events = { git = "https://github.com/matrix-org/ruma-events", version = "0.17.0" }
ruma-identifiers = "0.14.1" ruma-identifiers = "0.14.1"
@ -37,7 +39,6 @@ zeroize = { version = "1.1.0", optional = true }
# Misc dependencies # Misc dependencies
thiserror = "1.0.13" thiserror = "1.0.13"
async-trait = { version = "0.1.26", optional = true }
tracing = "0.1.13" tracing = "0.1.13"
[dependencies.tracing-futures] [dependencies.tracing-futures]
@ -61,3 +62,4 @@ tokio = { version = "0.2.13", features = ["rt-threaded", "macros"] }
tracing-subscriber = "0.2.3" tracing-subscriber = "0.2.3"
tempfile = "3.1.0" tempfile = "3.1.0"
mockito = "0.23.3" mockito = "0.23.3"

View File

@ -1,4 +1,4 @@
use std::sync::{Arc, RwLock}; use std::sync::{Arc, Mutex};
use std::{env, process::exit}; use std::{env, process::exit};
use url::Url; use url::Url;
@ -7,34 +7,32 @@ use matrix_sdk::{
events::{ events::{
collections::all::RoomEvent, collections::all::RoomEvent,
room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, room::message::{MessageEvent, MessageEventContent, TextMessageEventContent},
EventResult,
}, },
AsyncClient, AsyncClientConfig, Room, SyncSettings, AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings,
}; };
async fn async_cb(room: Arc<RwLock<Room>>, event: Arc<EventResult<RoomEvent>>) { struct EventCallback;
let room = room.read().unwrap();
let event = if let EventResult::Ok(event) = &*event { #[async_trait::async_trait]
event impl EventEmitter for EventCallback {
} else { async fn on_room_message(&mut self, room: &Room, event: &RoomEvent) {
return; if let RoomEvent::RoomMessage(MessageEvent {
}; content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }),
if let RoomEvent::RoomMessage(MessageEvent { sender,
content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), ..
sender, }) = event
.. {
}) = event let member = room.members.get(&sender.to_string()).unwrap();
{ println!(
let member = room.members.get(&sender.to_string()).unwrap(); "{}: {}",
println!( member
"{}: {}", .user
member .display_name
.user .as_ref()
.display_name .unwrap_or(&sender.to_string()),
.as_ref() msg_body
.unwrap_or(&sender.to_string()), );
msg_body }
);
} }
} }
@ -49,7 +47,9 @@ async fn login(
let homeserver_url = Url::parse(&homeserver_url)?; let homeserver_url = Url::parse(&homeserver_url)?;
let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap(); let mut client = AsyncClient::new_with_config(homeserver_url, None, client_config).unwrap();
client.add_event_callback(async_cb); client
.add_event_emitter(Arc::new(Mutex::new(EventCallback)))
.await;
client client
.login(username, password, None, Some("rust-sdk".to_string())) .login(username, password, None, Some("rust-sdk".to_string()))

View File

@ -17,13 +17,13 @@ use std::collections::HashMap;
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::result::Result as StdResult; use std::result::Result as StdResult;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock as SyncLock}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::future::{BoxFuture, Future, FutureExt}; use futures::future::{BoxFuture, Future, FutureExt};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::delay_for as sleep; use tokio::time::delay_for as sleep;
use tracing::{debug, info, instrument, trace}; use tracing::{info, instrument, trace};
use http::Method as HttpMethod; use http::Method as HttpMethod;
use http::Response as HttpResponse; use http::Response as HttpResponse;
@ -31,8 +31,6 @@ use reqwest::header::{HeaderValue, InvalidHeaderValue};
use url::Url; use url::Url;
use ruma_api::{Endpoint, Outgoing}; use ruma_api::{Endpoint, Outgoing};
use ruma_events::collections::all::RoomEvent;
use ruma_events::presence::PresenceEvent;
use ruma_events::room::message::MessageEventContent; use ruma_events::room::message::MessageEventContent;
use ruma_events::EventResult; use ruma_events::EventResult;
pub use ruma_events::EventType; pub use ruma_events::EventType;
@ -43,19 +41,9 @@ use ruma_identifiers::{DeviceId, UserId};
use crate::api; use crate::api;
use crate::base_client::Client as BaseClient; use crate::base_client::Client as BaseClient;
use crate::models::Room;
use crate::session::Session; use crate::session::Session;
use crate::VERSION; use crate::VERSION;
use crate::{Error, Result}; use crate::{Error, EventEmitter, Result};
type RoomEventCallback = Box<
dyn FnMut(Arc<SyncLock<Room>>, Arc<EventResult<RoomEvent>>) -> BoxFuture<'static, ()> + Send,
>;
type PresenceEventCallback = Box<
dyn FnMut(Arc<SyncLock<Room>>, Arc<EventResult<PresenceEvent>>) -> BoxFuture<'static, ()>
+ Send,
>;
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
@ -70,9 +58,6 @@ pub struct AsyncClient {
pub(crate) base_client: Arc<RwLock<BaseClient>>, pub(crate) base_client: Arc<RwLock<BaseClient>>,
/// The transaction id. /// The transaction id.
transaction_id: Arc<AtomicU64>, transaction_id: Arc<AtomicU64>,
/// Event callbacks
event_callbacks: Arc<Mutex<Vec<RoomEventCallback>>>,
presence_callbacks: Arc<Mutex<Vec<PresenceEventCallback>>>,
} }
impl std::fmt::Debug for AsyncClient { impl std::fmt::Debug for AsyncClient {
@ -224,6 +209,7 @@ impl AsyncClient {
session: Option<Session>, session: Option<Session>,
config: AsyncClientConfig, config: AsyncClientConfig,
) -> Result<Self> { ) -> Result<Self> {
#[allow(clippy::match_wild_err_arm)]
let homeserver: Url = match homeserver_url.try_into() { let homeserver: Url = match homeserver_url.try_into() {
Ok(u) => u, Ok(u) => u,
Err(_e) => panic!("Error parsing homeserver url"), Err(_e) => panic!("Error parsing homeserver url"),
@ -258,8 +244,6 @@ impl AsyncClient {
http_client, http_client,
base_client: Arc::new(RwLock::new(BaseClient::new(session)?)), base_client: Arc::new(RwLock::new(BaseClient::new(session)?)),
transaction_id: Arc::new(AtomicU64::new(0)), transaction_id: Arc::new(AtomicU64::new(0)),
event_callbacks: Arc::new(Mutex::new(Vec::new())),
presence_callbacks: Arc::new(Mutex::new(Vec::new())),
}) })
} }
@ -275,156 +259,28 @@ impl AsyncClient {
&self.homeserver &self.homeserver
} }
/// Calculate the room name from a `RoomId`, returning a string. /// Add `EventEmitter` to `AsyncClient`.
///
/// The methods of `EventEmitter` are called when the respective `RoomEvents` occur.
pub async fn add_event_emitter(&mut self, emitter: Arc<tokio::sync::Mutex<Box<dyn EventEmitter>>>) {
self.base_client.write().await.event_emitter = Some(emitter);
}
/// Calculates the room name from a `RoomId`, returning a string.
pub async fn get_room_name(&self, room_id: &str) -> Option<String> { pub async fn get_room_name(&self, room_id: &str) -> Option<String> {
self.base_client.read().await.calculate_room_name(room_id) self.base_client.read().await.calculate_room_name(room_id)
} }
/// Calculate the room names this client knows about. /// Calculates the room names this client knows about.
pub async fn get_room_names(&self) -> Vec<String> { pub async fn get_room_names(&self) -> Vec<String> {
self.base_client.read().await.calculate_room_names() self.base_client.read().await.calculate_room_names()
} }
/// Calculate the room names this client knows about. /// Calculates the room that the client last interacted with.
pub async fn current_room_id(&self) -> Option<RoomId> { pub async fn current_room_id(&self) -> Option<RoomId> {
self.base_client.read().await.current_room_id() self.base_client.read().await.current_room_id()
} }
/// Add a callback that will be called every time the client receives a room
/// event
///
/// # Arguments
///
/// * `callback` - The callback that should be called once a RoomEvent is
/// received.
///
/// # Examples
/// ```
/// # use matrix_sdk::events::{
/// # collections::all::RoomEvent,
/// # room::message::{MessageEvent, MessageEventContent, TextMessageEventContent},
/// # EventResult,
/// # };
/// # use matrix_sdk::Room;
/// # use std::sync::{Arc, RwLock};
/// # use matrix_sdk::AsyncClient;
/// # use url::Url;
///
/// async fn async_cb(room: Arc<RwLock<Room>>, event: Arc<EventResult<RoomEvent>>) {
/// let room = room.read().unwrap();
/// let event = if let EventResult::Ok(event) = &*event {
/// event
/// } else {
/// return;
/// };
/// if let RoomEvent::RoomMessage(MessageEvent {
/// content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }),
/// sender,
/// ..
/// }) = event
/// {
/// let member = room.members.get(&sender.to_string()).unwrap();
/// println!(
/// "{}: {}",
/// member.user.display_name.as_ref().unwrap_or(&sender.to_string()),
/// msg_body
/// );
/// }
/// }
/// # fn main() -> Result<(), matrix_sdk::Error> {
/// let homeserver = Url::parse("http://localhost:8080")?;
///
/// let mut client = AsyncClient::new(homeserver, None)?;
///
/// client.add_event_callback(async_cb);
/// # Ok(())
/// # }
/// ```
pub fn add_event_callback<C: 'static>(
&mut self,
mut callback: impl FnMut(Arc<SyncLock<Room>>, Arc<EventResult<RoomEvent>>) -> C + 'static + Send,
) where
C: Future<Output = ()> + Send,
{
let mut futures = self.event_callbacks.lock().unwrap();
let future = move |room, event| callback(room, event).boxed();
futures.push(Box::new(future));
}
/// Add a callback that will be called every time the client receives a presence
/// event
///
/// # Arguments
///
/// * `callback` - The callback that should be called once a RoomEvent is
/// received.
///
/// # Examples
/// ```
/// # use matrix_sdk::events::{
/// # collections::all::RoomEvent,
/// # room::message::{MessageEvent, MessageEventContent, TextMessageEventContent},
/// # presence::{PresenceEvent, PresenceEventContent},
/// # EventResult,
/// # };
/// # use matrix_sdk::Room;
/// # use std::sync::{Arc, RwLock};
/// # use matrix_sdk::AsyncClient;
/// # use url::Url;
///
/// async fn async_cb(room: Arc<RwLock<Room>>, event: Arc<EventResult<PresenceEvent>>) {
/// let room = room.read().unwrap();
/// let event = if let EventResult::Ok(event) = &*event {
/// event
/// } else {
/// return;
/// };
/// let PresenceEvent {
/// content: PresenceEventContent {
/// avatar_url,
/// currently_active,
/// displayname,
/// last_active_ago,
/// presence,
/// status_msg,
/// },
/// sender,
/// } = event;
/// {
/// let member = room.members.get(&sender.to_string()).unwrap();
/// println!(
/// "{} is {}",
/// displayname.as_deref().unwrap_or(&sender.to_string()),
/// status_msg.as_deref().unwrap_or("not here")
/// );
/// }
/// }
/// # fn main() -> Result<(), matrix_sdk::Error> {
/// let homeserver = Url::parse("http://localhost:8080")?;
///
/// let mut client = AsyncClient::new(homeserver, None)?;
///
/// client.add_presence_callback(async_cb);
/// # Ok(())
/// # }
/// ```
pub fn add_presence_callback<C: 'static>(
&mut self,
mut callback: impl FnMut(Arc<SyncLock<Room>>, Arc<EventResult<PresenceEvent>>) -> C
+ 'static
+ Send,
) where
C: Future<Output = ()> + Send,
{
let mut futures = self.presence_callbacks.lock().unwrap();
let future = move |room, event| callback(room, event).boxed();
futures.push(Box::new(future));
}
/// Login to the server. /// Login to the server.
/// ///
/// # Arguments /// # Arguments
@ -486,9 +342,9 @@ impl AsyncClient {
for (room_id, room) in &mut response.rooms.join { for (room_id, room) in &mut response.rooms.join {
let room_id_string = room_id.to_string(); let room_id_string = room_id.to_string();
let matrix_room = { let mut client = self.base_client.write().await;
let mut client = self.base_client.write().await;
let _matrix_room = {
for event in &room.state.events { for event in &room.state.events {
if let EventResult::Ok(e) = event { if let EventResult::Ok(e) = event {
client.receive_joined_state_event(&room_id_string, &e); client.receive_joined_state_event(&room_id_string, &e);
@ -498,9 +354,16 @@ impl AsyncClient {
client.get_or_create_room(&room_id_string).clone() client.get_or_create_room(&room_id_string).clone()
}; };
// TODO should we determine if anything room state has changed before calling
// re looping is not ideal here
for event in &mut room.state.events {
if let EventResult::Ok(e) = event {
client.emit_state_event(room_id, e).await;
}
}
for mut event in &mut room.timeline.events { for mut event in &mut room.timeline.events {
let decrypted_event = { let decrypted_event = {
let mut client = self.base_client.write().await;
client client
.receive_joined_timeline_event(room_id, &mut event) .receive_joined_timeline_event(room_id, &mut event)
.await .await
@ -510,35 +373,21 @@ impl AsyncClient {
*event = e; *event = e;
} }
let callbacks = { // TODO should we determine if any room state has changed before calling
let mut cb_futures = self.event_callbacks.lock().unwrap(); if let EventResult::Ok(e) = event {
client.emit_timeline_event(room_id, e).await;
let event = if !cb_futures.is_empty() {
Arc::new(event.clone())
} else {
continue;
};
let mut callbacks = Vec::new();
for cb in &mut cb_futures.iter_mut() {
callbacks.push(cb(matrix_room.clone(), Arc::clone(&event)));
}
callbacks
};
for cb in callbacks {
cb.await;
} }
} }
// look at AccountData to further cut down users by collecting ignored users // look at AccountData to further cut down users by collecting ignored users
// TODO actually use the ignored users for account_data in &mut room.account_data.events {
for account_data in &room.account_data.events { {
let mut client = self.base_client.write().await; if let EventResult::Ok(e) = account_data {
if let EventResult::Ok(e) = account_data { client.receive_account_data(&room_id_string, e);
client.receive_account_data(&room_id_string, e);
// TODO should we determine if anything room state has changed before calling
client.emit_account_data_event(room_id, e).await;
}
} }
} }
@ -547,28 +396,14 @@ impl AsyncClient {
// After the room has been created and state/timeline events accounted for we use the room_id of the newly created // After the room has been created and state/timeline events accounted for we use the room_id of the newly created
// room to add any presence events that relate to a user in the current room. This is not super // room to add any presence events that relate to a user in the current room. This is not super
// efficient but we need a room_id so we would loop through now or later. // efficient but we need a room_id so we would loop through now or later.
for presence in &response.presence.events { for presence in &mut response.presence.events {
let mut client = self.base_client.write().await; {
if let EventResult::Ok(e) = presence { if let EventResult::Ok(e) = presence {
client.receive_presence_event(&room_id_string, e); client.receive_presence_event(&room_id_string, e);
}
let callbacks = { // TODO should we determine if any room state has changed before calling
let mut cb_futures = self.presence_callbacks.lock().unwrap(); client.emit_presence_event(room_id, e).await;
let event = if !cb_futures.is_empty() {
Arc::new(presence.clone())
} else {
continue;
};
let mut callbacks = Vec::new();
for cb in &mut cb_futures.iter_mut() {
callbacks.push(cb(matrix_room.clone(), Arc::clone(&event)));
} }
callbacks
};
for cb in callbacks {
cb.await;
} }
} }
} }

View File

@ -15,6 +15,8 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt;
use std::sync::{Arc, RwLock};
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use std::result::Result as StdResult; use std::result::Result as StdResult;
@ -27,21 +29,13 @@ use crate::events::presence::PresenceEvent;
use crate::events::collections::only::Event as NonRoomEvent; use crate::events::collections::only::Event as NonRoomEvent;
use crate::events::ignored_user_list::IgnoredUserListEvent; use crate::events::ignored_user_list::IgnoredUserListEvent;
use crate::events::push_rules::{PushRulesEvent, Ruleset}; use crate::events::push_rules::{PushRulesEvent, Ruleset};
use crate::events::room::{
aliases::AliasesEvent,
canonical_alias::CanonicalAliasEvent,
member::{MemberEvent, MembershipState},
name::NameEvent,
};
use crate::events::EventResult; use crate::events::EventResult;
use crate::identifiers::{RoomAliasId, UserId as Uid}; use crate::identifiers::{RoomAliasId, UserId as Uid};
use crate::models::Room; use crate::models::Room;
use crate::session::Session; use crate::session::Session;
use std::sync::{Arc, RwLock}; use crate::EventEmitter;
use js_int::UInt; use js_int::UInt;
#[cfg(feature = "encryption")]
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
@ -94,7 +88,6 @@ impl CurrentRoom {
} }
} }
#[derive(Debug)]
/// A no IO Client implementation. /// A no IO Client implementation.
/// ///
/// This Client is a state machine that receives responses and events and /// This Client is a state machine that receives responses and events and
@ -113,11 +106,28 @@ pub struct Client {
pub ignored_users: Vec<UserId>, pub ignored_users: Vec<UserId>,
/// The push ruleset for the logged in user. /// The push ruleset for the logged in user.
pub push_ruleset: Option<Ruleset>, pub push_ruleset: Option<Ruleset>,
/// Any implementor of EventEmitter will act as the callbacks for various
/// events.
pub event_emitter: Option<Arc<Mutex<Box<dyn EventEmitter>>>>,
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
olm: Arc<Mutex<Option<OlmMachine>>>, olm: Arc<Mutex<Option<OlmMachine>>>,
} }
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("session", &self.session)
.field("sync_token", &self.sync_token)
.field("joined_rooms", &self.joined_rooms)
.field("current_room_id", &self.current_room_id)
.field("ignored_users", &self.ignored_users)
.field("push_ruleset", &self.push_ruleset)
.field("event_emitter", &"EventEmitter<...>")
.finish()
}
}
impl Client { impl Client {
/// Create a new client. /// Create a new client.
/// ///
@ -139,6 +149,7 @@ impl Client {
current_room_id: CurrentRoom::default(), current_room_id: CurrentRoom::default(),
ignored_users: Vec::new(), ignored_users: Vec::new(),
push_ruleset: None, push_ruleset: None,
event_emitter: None,
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
olm: Arc::new(Mutex::new(olm)), olm: Arc::new(Mutex::new(olm)),
}) })
@ -213,8 +224,8 @@ impl Client {
)))) ))))
} }
pub(crate) fn get_room(&mut self, room_id: &str) -> Option<&mut Arc<RwLock<Room>>> { pub(crate) fn get_room(&self, room_id: &str) -> Option<&Arc<RwLock<Room>>> {
self.joined_rooms.get_mut(room_id) self.joined_rooms.get(room_id)
} }
/// Handle a m.ignored_user_list event, updating the room state if necessary. /// Handle a m.ignored_user_list event, updating the room state if necessary.
@ -489,18 +500,252 @@ impl Client {
// TODO notify our callers of new devices via some callback. // TODO notify our callers of new devices via some callback.
Ok(()) Ok(())
} }
pub(crate) async fn emit_timeline_event(&mut self, room_id: &RoomId, event: &mut RoomEvent) {
match event {
RoomEvent::RoomMember(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_member(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomName(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_name(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomCanonicalAlias(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_canonical_alias(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomAliases(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_aliases(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomAvatar(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_avatar(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomMessage(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_message(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomMessageFeedback(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_message_feedback(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomRedaction(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_redaction(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
RoomEvent::RoomPowerLevels(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_room_power_levels(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
_ => {}
}
}
pub(crate) async fn emit_state_event(&mut self, room_id: &RoomId, event: &mut StateEvent) {
match event {
StateEvent::RoomMember(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_state_member(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
StateEvent::RoomName(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_state_name(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
StateEvent::RoomCanonicalAlias(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_state_canonical_alias(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
StateEvent::RoomAliases(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_state_aliases(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
StateEvent::RoomAvatar(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_state_avatar(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
StateEvent::RoomPowerLevels(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_state_power_levels(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
StateEvent::RoomJoinRules(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_state_join_rules(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
_ => {}
}
}
pub(crate) async fn emit_account_data_event(
&mut self,
room_id: &RoomId,
event: &mut NonRoomEvent,
) {
match event {
NonRoomEvent::Presence(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_account_presence(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
NonRoomEvent::IgnoredUserList(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_account_ignored_users(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
NonRoomEvent::PushRules(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_account_push_rules(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
NonRoomEvent::FullyRead(_) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_account_data_fully_read(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
_ => {}
}
}
pub(crate) async fn emit_presence_event(
&mut self,
room_id: &RoomId,
event: &mut PresenceEvent,
) {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id.to_string()) {
ee.lock()
.await
.on_presence_event(Arc::clone(&room), Arc::new(Mutex::new(event.clone())))
.await;
}
}
}
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*;
use crate::events::room::member::MembershipState;
use crate::identifiers::UserId; use crate::identifiers::UserId;
use crate::{AsyncClient, Session, SyncSettings}; use crate::{AsyncClient, Session, SyncSettings};
use mockito::{mock, Matcher}; use mockito::{mock, Matcher};
use tokio::runtime::Runtime;
use url::Url; use url::Url;
use std::convert::TryFrom; use std::convert::TryFrom;

76
src/event_emitter/mod.rs Normal file
View File

@ -0,0 +1,76 @@
// Copyright 2020 Damir Jelić
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock};
use crate::events::collections::all::{RoomEvent, StateEvent};
use crate::events::collections::only::Event as NonRoomEvent;
use crate::events::presence::PresenceEvent;
use crate::models::Room;
use tokio::sync::Mutex;
///
#[async_trait::async_trait]
pub trait EventEmitter: Send + Sync {
// ROOM EVENTS from `IncomingTimeline`
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMember` event.
async fn on_room_member(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomName` event.
async fn on_room_name(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomCanonicalAlias` event.
async fn on_room_canonical_alias(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomAliases` event.
async fn on_room_aliases(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomAvatar` event.
async fn on_room_avatar(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMessage` event.
async fn on_room_message(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomMessageFeedback` event.
async fn on_room_message_feedback(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomRedaction` event.
async fn on_room_redaction(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
/// Fires when `AsyncClient` receives a `RoomEvent::RoomPowerLevels` event.
async fn on_room_power_levels(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<RoomEvent>>) {}
// `RoomEvent`s from `IncomingState`
/// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event.
async fn on_state_member(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomName` event.
async fn on_state_name(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event.
async fn on_state_canonical_alias(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event.
async fn on_state_aliases(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event.
async fn on_state_avatar(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event.
async fn on_state_power_levels(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event.
async fn on_state_join_rules(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<StateEvent>>) {}
// `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData`
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event.
async fn on_account_presence(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomName` event.
async fn on_account_ignored_users(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomCanonicalAlias` event.
async fn on_account_push_rules(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event.
async fn on_account_data_fully_read(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<NonRoomEvent>>) {}
// `PresenceEvent` is a struct so there is only the one method
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomAliases` event.
async fn on_presence_event(&mut self, _: Arc<RwLock<Room>>, _: Arc<Mutex<PresenceEvent>>) {}
}

View File

@ -32,9 +32,12 @@ pub use ruma_client_api as api;
pub use ruma_events as events; pub use ruma_events as events;
pub use ruma_identifiers as identifiers; pub use ruma_identifiers as identifiers;
pub use ruma_api as ruma_traits;
mod async_client; mod async_client;
mod base_client; mod base_client;
mod error; mod error;
mod event_emitter;
mod models; mod models;
mod session; mod session;
@ -43,6 +46,7 @@ mod crypto;
pub use async_client::{AsyncClient, AsyncClientConfig, SyncSettings}; pub use async_client::{AsyncClient, AsyncClientConfig, SyncSettings};
pub use base_client::Client; pub use base_client::Client;
pub use event_emitter::EventEmitter;
pub use models::Room; pub use models::Room;
pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION"); pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION");

View File

@ -6,5 +6,6 @@ pub use room::{Room, RoomName};
pub use room_member::RoomMember; pub use room_member::RoomMember;
pub use user::User; pub use user::User;
#[allow(dead_code)]
pub type Token = String; pub type Token = String;
pub type UserId = String; pub type UserId = String;

View File

@ -14,11 +14,11 @@
// limitations under the License. // limitations under the License.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use super::{RoomMember, User, UserId}; use super::{RoomMember, UserId};
use crate::events::collections::all::{RoomEvent, StateEvent}; use crate::events::collections::all::{RoomEvent, StateEvent};
use crate::events::presence::PresenceEvent;
use crate::events::room::{ use crate::events::room::{
aliases::AliasesEvent, aliases::AliasesEvent,
canonical_alias::CanonicalAliasEvent, canonical_alias::CanonicalAliasEvent,
@ -27,12 +27,7 @@ use crate::events::room::{
name::NameEvent, name::NameEvent,
power_levels::PowerLevelsEvent, power_levels::PowerLevelsEvent,
}; };
use crate::events::{
presence::{PresenceEvent, PresenceEventContent},
EventResult,
};
use crate::identifiers::RoomAliasId; use crate::identifiers::RoomAliasId;
use crate::session::Session;
use js_int::UInt; use js_int::UInt;
@ -317,14 +312,11 @@ impl Room {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*;
use crate::events::room::member::MembershipState; use crate::events::room::member::MembershipState;
use crate::identifiers::UserId; use crate::identifiers::UserId;
use crate::{AsyncClient, Session, SyncSettings}; use crate::{AsyncClient, Session, SyncSettings};
use mockito::{mock, Matcher}; use mockito::{mock, Matcher};
use tokio::runtime::Runtime;
use url::Url; use url::Url;
use std::convert::TryFrom; use std::convert::TryFrom;
@ -363,7 +355,7 @@ mod test {
.unwrap(); .unwrap();
assert_eq!(2, room.members.len()); assert_eq!(2, room.members.len());
for (id, member) in &room.members { for (_id, member) in &room.members {
assert_eq!(MembershipState::Join, member.membership); assert_eq!(MembershipState::Join, member.membership);
} }
} }

View File

@ -16,20 +16,14 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use super::User; use super::User;
use crate::api::r0 as api; use crate::events::collections::all::Event;
use crate::events::collections::all::{Event, RoomEvent, StateEvent};
use crate::events::room::{ use crate::events::room::{
aliases::AliasesEvent, member::{MemberEvent, MembershipChange, MembershipState},
canonical_alias::CanonicalAliasEvent,
member::{MemberEvent, MemberEventContent, MembershipChange, MembershipState},
name::NameEvent,
power_levels::PowerLevelsEvent, power_levels::PowerLevelsEvent,
}; };
use crate::events::EventResult; use crate::identifiers::UserId;
use crate::identifiers::{RoomAliasId, UserId};
use crate::session::Session;
use js_int::{Int, UInt}; use js_int::Int;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -108,7 +102,7 @@ impl RoomMember {
max_power = *power.max(&max_power); max_power = *power.max(&max_power);
} }
let mut changed = false; let changed;
if let Some(user_power) = event.content.users.get(&self.user_id) { if let Some(user_power) = event.content.users.get(&self.user_id) {
changed = self.power_level != Some(*user_power); changed = self.power_level != Some(*user_power);
self.power_level = Some(*user_power); self.power_level = Some(*user_power);
@ -134,7 +128,6 @@ mod test {
use js_int::{Int, UInt}; use js_int::{Int, UInt};
use mockito::{mock, Matcher}; use mockito::{mock, Matcher};
use tokio::runtime::Runtime;
use url::Url; use url::Url;
use std::collections::HashMap; use std::collections::HashMap;
@ -177,7 +170,7 @@ mod test {
.write() .write()
.unwrap(); .unwrap();
for (id, member) in &mut room.members { for (_id, member) in &mut room.members {
let power = power_levels(); let power = power_levels();
assert!(member.update_power(&power)); assert!(member.update_power(&power));
assert_eq!(MembershipState::Join, member.membership); assert_eq!(MembershipState::Join, member.membership);

View File

@ -13,22 +13,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashMap; use crate::events::collections::all::Event;
use std::sync::{Arc, RwLock};
use super::UserId;
use crate::api::r0 as api;
use crate::events::collections::all::{Event, RoomEvent, StateEvent};
use crate::events::presence::{PresenceEvent, PresenceEventContent, PresenceState}; use crate::events::presence::{PresenceEvent, PresenceEventContent, PresenceState};
use crate::events::room::{ use crate::events::room::member::MemberEvent;
aliases::AliasesEvent,
canonical_alias::CanonicalAliasEvent,
member::{MemberEvent, MembershipState},
name::NameEvent,
};
use crate::events::EventResult;
use crate::identifiers::RoomAliasId;
use crate::session::Session;
use js_int::UInt; use js_int::UInt;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]

View File

@ -2,7 +2,6 @@ use matrix_sdk::identifiers::UserId;
use matrix_sdk::{AsyncClient, Session, SyncSettings}; use matrix_sdk::{AsyncClient, Session, SyncSettings};
use mockito::{mock, Matcher}; use mockito::{mock, Matcher};
use tokio::runtime::Runtime;
use url::Url; use url::Url;
use std::convert::TryFrom; use std::convert::TryFrom;