diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index c4cc92b5..62e03396 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -31,9 +31,11 @@ appservice = ["ruma/appservice-api-s", "ruma/appservice-api-helper", "ruma/rand" docs = ["encryption", "sled_cryptostore", "sled_state_store", "sso_login"] [dependencies] +anyhow = { version = "1.0.42", optional = true } dashmap = "4.0.2" futures = "0.3.15" http = "0.2.4" +serde = "1.0.126" serde_json = "1.0.64" thiserror = "1.0.25" tracing = "0.1.26" diff --git a/matrix_sdk/examples/autojoin.rs b/matrix_sdk/examples/autojoin.rs index 6838b7c1..5db51444 100644 --- a/matrix_sdk/examples/autojoin.rs +++ b/matrix_sdk/examples/autojoin.rs @@ -1,61 +1,41 @@ use std::{env, process::exit}; use matrix_sdk::{ - async_trait, room::Room, ruma::events::{room::member::MemberEventContent, StrippedStateEvent}, - Client, ClientConfig, EventHandler, SyncSettings, + Client, ClientConfig, SyncSettings, }; use tokio::time::{sleep, Duration}; use url::Url; -struct AutoJoinBot { +async fn on_stripped_state_member( + room_member: StrippedStateEvent, client: Client, -} - -impl AutoJoinBot { - pub fn new(client: Client) -> Self { - Self { client } + room: Room, +) { + if room_member.state_key != client.user_id().await.unwrap() { + return; } -} -#[async_trait] -impl EventHandler for AutoJoinBot { - async fn on_stripped_state_member( - &self, - room: Room, - room_member: &StrippedStateEvent, - _: Option, - ) { - if room_member.state_key != self.client.user_id().await.unwrap() { - return; - } + if let Room::Invited(room) = room { + println!("Autojoining room {}", room.room_id()); + let mut delay = 2; - if let Room::Invited(room) = room { - println!("Autojoining room {}", room.room_id()); - let mut delay = 2; + while let Err(err) = room.accept_invitation().await { + // retry autojoin due to synapse sending invites, before the + // invited user can join for more information see + // https://github.com/matrix-org/synapse/issues/4345 + eprintln!("Failed to join room {} ({:?}), retrying in {}s", room.room_id(), err, delay); - while let Err(err) = room.accept_invitation().await { - // retry autojoin due to synapse sending invites, before the - // invited user can join for more information see - // https://github.com/matrix-org/synapse/issues/4345 - eprintln!( - "Failed to join room {} ({:?}), retrying in {}s", - room.room_id(), - err, - delay - ); + sleep(Duration::from_secs(delay)).await; + delay *= 2; - sleep(Duration::from_secs(delay)).await; - delay *= 2; - - if delay > 3600 { - eprintln!("Can't join room {} ({:?})", room.room_id(), err); - break; - } + if delay > 3600 { + eprintln!("Can't join room {} ({:?})", room.room_id(), err); + break; } - println!("Successfully joined room {}", room.room_id()); } + println!("Successfully joined room {}", room.room_id()); } } @@ -76,7 +56,7 @@ async fn login_and_sync( println!("logged in as {}", username); - client.set_event_handler(Box::new(AutoJoinBot::new(client.clone()))).await; + client.register_event_handler(on_stripped_state_member).await; client.sync(SyncSettings::default()).await; diff --git a/matrix_sdk/examples/command_bot.rs b/matrix_sdk/examples/command_bot.rs index 90f00e38..77fbc34e 100644 --- a/matrix_sdk/examples/command_bot.rs +++ b/matrix_sdk/examples/command_bot.rs @@ -1,55 +1,43 @@ use std::{env, process::exit}; use matrix_sdk::{ - async_trait, room::Room, ruma::events::{ room::message::{MessageEventContent, MessageType, TextMessageEventContent}, AnyMessageEventContent, SyncMessageEvent, }, - Client, ClientConfig, EventHandler, SyncSettings, + Client, ClientConfig, SyncSettings, }; use url::Url; -struct CommandBot; +async fn on_room_message(event: SyncMessageEvent, room: Room) { + if let Room::Joined(room) = room { + let msg_body = if let SyncMessageEvent { + content: + MessageEventContent { + msgtype: MessageType::Text(TextMessageEventContent { body: msg_body, .. }), + .. + }, + .. + } = event + { + msg_body + } else { + return; + }; -impl CommandBot { - pub fn new() -> Self { - Self {} - } -} + if msg_body.contains("!party") { + let content = AnyMessageEventContent::RoomMessage(MessageEventContent::text_plain( + "πŸŽ‰πŸŽŠπŸ₯³ let's PARTY!! πŸ₯³πŸŽŠπŸŽ‰", + )); -#[async_trait] -impl EventHandler for CommandBot { - async fn on_room_message(&self, room: Room, event: &SyncMessageEvent) { - if let Room::Joined(room) = room { - let msg_body = if let SyncMessageEvent { - content: - MessageEventContent { - msgtype: MessageType::Text(TextMessageEventContent { body: msg_body, .. }), - .. - }, - .. - } = event - { - msg_body - } else { - return; - }; + println!("sending"); - if msg_body.contains("!party") { - let content = AnyMessageEventContent::RoomMessage(MessageEventContent::text_plain( - "πŸŽ‰πŸŽŠπŸ₯³ let's PARTY!! πŸ₯³πŸŽŠπŸŽ‰", - )); + // send our message to the room we found the "!party" command in + // the last parameter is an optional Uuid which we don't care about. + room.send(content, None).await.unwrap(); - println!("sending"); - - // send our message to the room we found the "!party" command in - // the last parameter is an optional Uuid which we don't care about. - room.send(content, None).await.unwrap(); - - println!("message sent"); - } + println!("message sent"); } } } @@ -79,7 +67,7 @@ async fn login_and_sync( client.sync_once(SyncSettings::default()).await.unwrap(); // add our CommandBot to be notified of incoming messages, we do this after the // initial sync to avoid responding to messages before the bot was running. - client.set_event_handler(Box::new(CommandBot::new())).await; + client.register_event_handler(on_room_message).await; // since we called `sync_once` before we entered our sync loop we must pass // that sync token to `sync` diff --git a/matrix_sdk/examples/image_bot.rs b/matrix_sdk/examples/image_bot.rs index 7df95ffd..a895c549 100644 --- a/matrix_sdk/examples/image_bot.rs +++ b/matrix_sdk/examples/image_bot.rs @@ -8,56 +8,46 @@ use std::{ }; use matrix_sdk::{ - self, async_trait, + self, room::Room, ruma::events::{ room::message::{MessageEventContent, MessageType, TextMessageEventContent}, SyncMessageEvent, }, - Client, EventHandler, SyncSettings, + Client, SyncSettings, }; use tokio::sync::Mutex; use url::Url; -struct ImageBot { +async fn on_room_message( + event: SyncMessageEvent, + room: Room, image: Arc>, -} +) { + if let Room::Joined(room) = room { + let msg_body = if let SyncMessageEvent { + content: + MessageEventContent { + msgtype: MessageType::Text(TextMessageEventContent { body: msg_body, .. }), + .. + }, + .. + } = event + { + msg_body + } else { + return; + }; -impl ImageBot { - pub fn new(image: File) -> Self { - let image = Arc::new(Mutex::new(image)); - Self { image } - } -} + if msg_body.contains("!image") { + println!("sending image"); + let mut image = image.lock().await; -#[async_trait] -impl EventHandler for ImageBot { - async fn on_room_message(&self, room: Room, event: &SyncMessageEvent) { - if let Room::Joined(room) = room { - let msg_body = if let SyncMessageEvent { - content: - MessageEventContent { - msgtype: MessageType::Text(TextMessageEventContent { body: msg_body, .. }), - .. - }, - .. - } = event - { - msg_body - } else { - return; - }; + room.send_attachment("cat", &mime::IMAGE_JPEG, &mut *image, None).await.unwrap(); - if msg_body.contains("!image") { - println!("sending image"); - let mut image = self.image.lock().await; + image.seek(SeekFrom::Start(0)).unwrap(); - room.send_attachment("cat", &mime::IMAGE_JPEG, &mut *image, None).await.unwrap(); - - image.seek(SeekFrom::Start(0)).unwrap(); - - println!("message sent"); - } + println!("message sent"); } } } @@ -74,7 +64,9 @@ async fn login_and_sync( client.login(&username, &password, None, Some("command bot")).await?; client.sync_once(SyncSettings::default()).await.unwrap(); - client.set_event_handler(Box::new(ImageBot::new(image))).await; + + let image = Arc::new(Mutex::new(image)); + client.register_event_handler(move |ev, room| on_room_message(ev, room, image.clone())).await; let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); client.sync(settings).await; diff --git a/matrix_sdk/examples/login.rs b/matrix_sdk/examples/login.rs index fa6ab054..b34e6e6f 100644 --- a/matrix_sdk/examples/login.rs +++ b/matrix_sdk/examples/login.rs @@ -1,36 +1,31 @@ use std::{env, process::exit}; use matrix_sdk::{ - self, async_trait, + self, room::Room, ruma::events::{ room::message::{MessageEventContent, MessageType, TextMessageEventContent}, SyncMessageEvent, }, - Client, EventHandler, SyncSettings, + Client, SyncSettings, }; use url::Url; -struct EventCallback; - -#[async_trait] -impl EventHandler for EventCallback { - async fn on_room_message(&self, room: Room, event: &SyncMessageEvent) { - if let Room::Joined(room) = room { - if let SyncMessageEvent { - content: - MessageEventContent { - msgtype: MessageType::Text(TextMessageEventContent { body: msg_body, .. }), - .. - }, - sender, - .. - } = event - { - let member = room.get_member(sender).await.unwrap().unwrap(); - let name = member.display_name().unwrap_or_else(|| member.user_id().as_str()); - println!("{}: {}", name, msg_body); - } +async fn on_room_message(event: SyncMessageEvent, room: Room) { + if let Room::Joined(room) = room { + if let SyncMessageEvent { + content: + MessageEventContent { + msgtype: MessageType::Text(TextMessageEventContent { body: msg_body, .. }), + .. + }, + sender, + .. + } = event + { + let member = room.get_member(&sender).await.unwrap().unwrap(); + let name = member.display_name().unwrap_or_else(|| member.user_id().as_str()); + println!("{}: {}", name, msg_body); } } } @@ -43,7 +38,7 @@ async fn login( let homeserver_url = Url::parse(&homeserver_url).expect("Couldn't parse the homeserver URL"); let client = Client::new(homeserver_url).unwrap(); - client.set_event_handler(Box::new(EventCallback)).await; + client.register_event_handler(on_room_message).await; client.login(username, password, None, Some("rust-sdk")).await?; client.sync(SyncSettings::new()).await; diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 0656cdaf..d7cd11b2 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -13,12 +13,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "encryption")] +use std::io::{Cursor, Write}; #[cfg(all(feature = "encryption", not(target_arch = "wasm32")))] use std::path::PathBuf; -#[cfg(feature = "encryption")] use std::{ collections::BTreeMap, - io::{Cursor, Write}, + fmt::{self, Debug}, + future::Future, + io::Read, + path::Path, + pin::Pin, + result::Result as StdResult, + sync::Arc, }; #[cfg(feature = "sso_login")] use std::{ @@ -26,16 +33,9 @@ use std::{ io::{Error as IoError, ErrorKind as IoErrorKind}, ops::Range, }; -use std::{ - fmt::{self, Debug}, - future::Future, - io::Read, - path::Path, - result::Result as StdResult, - sync::Arc, -}; use dashmap::DashMap; +use futures::FutureExt; use futures_timer::Delay as sleep; use http::HeaderValue; #[cfg(feature = "sso_login")] @@ -50,7 +50,7 @@ use matrix_sdk_base::crypto::{ #[cfg(feature = "encryption")] use matrix_sdk_base::deserialized_responses::RoomEvent; use matrix_sdk_base::{ - deserialized_responses::SyncResponse, + deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse}, media::{MediaEventContent, MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}, BaseClient, BaseClientConfig, Session, Store, }; @@ -60,14 +60,19 @@ use rand::{thread_rng, Rng}; use reqwest::header::InvalidHeaderValue; #[cfg(feature = "encryption")] use ruma::events::{AnyMessageEvent, AnyRoomEvent, AnySyncMessageEvent}; -use ruma::{api::SendAccessToken, events::AnyMessageEventContent, MxcUri}; +use ruma::{ + api::{client::r0::push::get_notifications::Notification, SendAccessToken}, + events::AnyMessageEventContent, + MxcUri, +}; +use serde::de::DeserializeOwned; #[cfg(feature = "sso_login")] use tokio::{net::TcpListener, sync::oneshot}; #[cfg(feature = "sso_login")] use tokio_stream::wrappers::TcpListenerStream; #[cfg(feature = "encryption")] -use tracing::{debug, warn}; -use tracing::{error, info, instrument}; +use tracing::debug; +use tracing::{error, info, instrument, warn}; use url::Url; #[cfg(feature = "sso_login")] use warp::Filter; @@ -138,9 +143,9 @@ use crate::{ }; use crate::{ error::HttpError, - event_handler::Handler, + event_handler::{EventHandler, EventHandlerData, EventHandlerResult, EventKind, SyncEvent}, http_client::{client_with_config, HttpClient, HttpSend}, - room, Error, EventHandler, Result, + room, Error, Result, }; const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); @@ -156,6 +161,14 @@ const SSO_SERVER_BIND_RANGE: Range = 20000..30000; #[cfg(feature = "sso_login")] const SSO_SERVER_BIND_TRIES: u8 = 10; +type EventHandlerFut = Pin + Send>>; +type EventHandlerFn = Box) -> EventHandlerFut + Send + Sync>; +type EventHandlerMap = BTreeMap<(EventKind, &'static str), Vec>; + +type NotificationHandlerFut = EventHandlerFut; +type NotificationHandlerFn = + Box NotificationHandlerFut + Send + Sync>; + /// An async/await enabled Matrix client. /// /// All of the state is held in an `Arc` so the `Client` can be cloned freely. @@ -176,9 +189,10 @@ pub struct Client { key_claim_lock: Arc>, pub(crate) members_request_locks: Arc>>>, pub(crate) typing_notice_times: Arc>, - /// Any implementor of EventHandler will act as the callbacks for various - /// events. - event_handler: Arc>>, + /// Event handlers. See `register_event_handler`. + pub(crate) event_handlers: Arc>, + /// Notification handlers. See `register_notification_handler`. + notification_handlers: Arc>>, /// Whether the client should operate in application service style mode. /// This is low-level functionality. For an high-level API check the /// `matrix_sdk_appservice` crate. @@ -557,12 +571,13 @@ impl Client { http_client, base_client, #[cfg(feature = "encryption")] - group_session_locks: Arc::new(DashMap::new()), + group_session_locks: Default::default(), #[cfg(feature = "encryption")] - key_claim_lock: Arc::new(Mutex::new(())), - members_request_locks: Arc::new(DashMap::new()), - typing_notice_times: Arc::new(DashMap::new()), - event_handler: Arc::new(RwLock::new(None)), + key_claim_lock: Default::default(), + members_request_locks: Default::default(), + typing_notice_times: Default::default(), + event_handlers: Default::default(), + notification_handlers: Default::default(), appservice_mode: config.appservice_mode, }) } @@ -666,12 +681,7 @@ impl Client { ) -> Result<()> { let txn_id = incoming_transaction.txn_id.clone(); let response = incoming_transaction.try_into_sync_response(txn_id)?; - let base_client = self.base_client.clone(); - let sync_response = base_client.receive_sync_response(response).await?; - - if let Some(handler) = self.event_handler.read().await.as_ref() { - handler.handle_sync(&sync_response).await; - } + self.process_sync(response).await?; Ok(()) } @@ -867,13 +877,125 @@ impl Client { Ok(()) } - /// Add `EventHandler` to `Client`. + /// Register a handler for a specific event type. /// - /// The methods of `EventHandler` are called when the respective - /// `RoomEvents` occur. - pub async fn set_event_handler(&self, handler: Box) { - let handler = Handler { inner: handler, client: self.clone() }; - *self.event_handler.write().await = Some(handler); + /// The handler is a function or closure with one or more arguments. The + /// first argument is the event itself. All additional arguments are + /// "context" arguments: They have to implement [`EventHandlerContext`]. + /// This trait is named that way because most of the types implementing it + /// give additional context about an event: The room it was in, its raw form + /// and other similar things. As an exception to this, + /// [`Client`] also implements the `EventHandlerContext` trait + /// so you don't have to clone your client into the event handler manually. + /// + /// Some context arguments are not universally applicable. A context + /// argument that isn't available for the given event type will result in + /// the event handler being skipped and an error being logged. The following + /// context argument types are only available for a subset of event types: + /// + /// * [`Room`][room::Room] is only available for room-specific events, i.e. + /// not for events like global account data events or presence events + /// + /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext + /// + /// # Examples + /// + /// ```no_run + /// # let client: matrix_sdk::Client = unimplemented!(); + /// use matrix_sdk::{ + /// room::Room, + /// ruma::{ + /// events::{ + /// macros::EventContent, + /// push_rules::PushRulesEvent, + /// room::{message::MessageEventContent, topic::TopicEventContent}, + /// SyncMessageEvent, SyncStateEvent, + /// }, + /// Int, MilliSecondsSinceUnixEpoch, + /// }, + /// Client, + /// }; + /// use serde::{Deserialize, Serialize}; + /// + /// # let _ = async { + /// client + /// .register_event_handler( + /// |ev: SyncMessageEvent, room: Room, client: Client| async move { + /// // Common usage: Room event plus room and client. + /// }, + /// ) + /// .await + /// .register_event_handler(|ev: SyncStateEvent| async move { + /// // Also possible: Omit any or all arguments after the first. + /// }) + /// .await; + /// + /// // Custom events work exactly the same way, you just need to declare the content struct and + /// // use the EventContent derive macro on it. + /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)] + /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = Message)] + /// struct TokenEventContent { + /// token: String, + /// #[serde(rename = "exp")] + /// expires_at: MilliSecondsSinceUnixEpoch, + /// } + /// + /// client.register_event_handler( + /// |ev: SyncMessageEvent, room: Room| async move { + /// todo!("Display the token"); + /// }, + /// ).await; + /// # }; + /// ``` + pub async fn register_event_handler(&self, handler: H) -> &Self + where + Ev: SyncEvent + DeserializeOwned + Send + 'static, + H: EventHandler, + ::Output: EventHandlerResult, + { + let event_type = H::ID.1; + self.event_handlers.write().await.entry(H::ID).or_default().push(Box::new(move |data| { + let maybe_fut = serde_json::from_str(data.raw.get()) + .map(|ev| handler.clone().handle_event(ev, data)); + + async move { + match maybe_fut { + Ok(Some(fut)) => { + fut.await.print_error(event_type); + } + Ok(None) => { + error!("Event handler for {} has an invalid context argument", event_type); + } + Err(e) => { + warn!( + "Failed to deserialize `{}` event, skipping event handler.\n\ + Deserialization error: {}", + event_type, e, + ); + } + } + } + .boxed() + })); + + self + } + + /// Register a handler for a notification. + /// + /// Similar to `.register_event_handler`, but only allows functions or + /// closures with exactly the three arguments `Notification`, `room::Room`, + /// `Client` for now. + pub async fn register_notification_handler(&self, handler: H) -> &Self + where + H: Fn(Notification, room::Room, Client) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + { + self.notification_handlers.write().await.push(Box::new( + move |notification, room, client| (handler)(notification, room, client).boxed(), + )); + + self } /// Get all the rooms the client knows about. @@ -1957,13 +2079,91 @@ impl Client { ); let response = self.send(request, Some(request_config)).await?; - let sync_response = self.base_client.receive_sync_response(response).await?; + self.process_sync(response).await + } - if let Some(handler) = self.event_handler.read().await.as_ref() { - handler.handle_sync(&sync_response).await; + async fn process_sync(&self, response: sync_events::Response) -> Result { + let response = self.base_client.receive_sync_response(response).await?; + let SyncResponse { + next_batch: _, + rooms, + presence, + account_data, + to_device: _, + device_lists: _, + device_one_time_keys_count: _, + ambiguity_changes: _, + notifications, + } = &response; + + self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?; + self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?; + + for (room_id, room_info) in &rooms.join { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } = + room_info; + + self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?; + self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) + .await?; + self.handle_sync_state_events(&room, &state.events).await?; + self.handle_sync_timeline_events(&room, &timeline.events).await?; } - Ok(sync_response) + for (room_id, room_info) in &rooms.leave { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + let LeftRoom { timeline, state, account_data } = room_info; + + self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events) + .await?; + self.handle_sync_state_events(&room, &state.events).await?; + self.handle_sync_timeline_events(&room, &timeline.events).await?; + } + + for (room_id, room_info) in &rooms.invite { + let room = self.get_room(room_id); + if room.is_none() { + error!("Can't call event handler, room {} not found", room_id); + continue; + } + + // FIXME: Destructure room_info + self.handle_sync_events(EventKind::InitialState, &room, &room_info.invite_state.events) + .await?; + } + + for handler in &*self.notification_handlers.read().await { + for (room_id, room_notifications) in notifications { + let room = match self.get_room(room_id) { + Some(room) => room, + None => { + warn!("Can't call notification handler, room {} not found", room_id); + continue; + } + }; + + for notification in room_notifications { + matrix_sdk_common::executor::spawn((handler)( + notification.clone(), + room.clone(), + self.clone(), + )); + } + } + } + + Ok(response) } /// Repeatedly call sync to synchronize the client state with the server. @@ -2766,8 +2966,10 @@ mod test { use std::{ collections::BTreeMap, convert::{TryFrom, TryInto}, + future, io::Cursor, str::FromStr, + sync::Arc, time::Duration, }; @@ -2797,17 +2999,18 @@ mod test { event_id, events::{ room::{ + member::MemberEventContent, message::{ImageMessageEventContent, MessageEventContent}, ImageInfo, }, - AnyMessageEventContent, AnySyncStateEvent, EventType, + AnyMessageEventContent, AnySyncStateEvent, EventType, SyncStateEvent, }, mxc_uri, room_id, thirdparty, uint, user_id, UserId, }; use serde_json::json; use super::{Client, Session, SyncSettings, Url}; - use crate::{ClientConfig, HttpError, RequestConfig, RoomMember}; + use crate::{room, ClientConfig, HttpError, RequestConfig, RoomMember}; async fn logged_in_client() -> Client { let session = Session { @@ -3067,6 +3270,56 @@ mod test { // assert_eq!(1, ignored_users.len()) } + #[tokio::test] + async fn event_handler() { + use std::sync::atomic::{AtomicU8, Ordering::SeqCst}; + + let client = logged_in_client().await; + + let member_count = Arc::new(AtomicU8::new(0)); + let typing_count = Arc::new(AtomicU8::new(0)); + let power_levels_count = Arc::new(AtomicU8::new(0)); + + client + .register_event_handler({ + let member_count = member_count.clone(); + move |_ev: SyncStateEvent, _room: room::Room| { + member_count.fetch_add(1, SeqCst); + future::ready(()) + } + }) + .await + .register_event_handler({ + let typing_count = typing_count.clone(); + move |_ev: SyncStateEvent| { + typing_count.fetch_add(1, SeqCst); + future::ready(()) + } + }) + .await + .register_event_handler({ + let power_levels_count = power_levels_count.clone(); + move |_ev: SyncStateEvent, + _client: Client, + _room: room::Room| { + power_levels_count.fetch_add(1, SeqCst); + future::ready(()) + } + }) + .await; + + let response = EventBuilder::default() + .add_room_event(EventsJson::Member) + .add_ephemeral(EventsJson::Typing) + .add_state_event(EventsJson::PowerLevels) + .build_sync_response(); + client.process_sync(response).await.unwrap(); + + assert_eq!(member_count.load(SeqCst), 1); + assert_eq!(typing_count.load(SeqCst), 1); + assert_eq!(power_levels_count.load(SeqCst), 1); + } + #[tokio::test] async fn room_creation() { let client = logged_in_client().await; diff --git a/matrix_sdk/src/event_handler.rs b/matrix_sdk/src/event_handler.rs new file mode 100644 index 00000000..60799eab --- /dev/null +++ b/matrix_sdk/src/event_handler.rs @@ -0,0 +1,439 @@ +// Copyright 2021 Jonas Platte +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Types and traits related for event handlers. For usage, see +//! [`Client::register_event_handler`]. +//! +//! ### How it works +//! +//! The `register_event_handler` method registers event handlers of different +//! signatures by actually storing boxed closures that all have the same +//! signature of `async (EventHandlerData) -> ()` where `EventHandlerData` is a +//! private type that contains all of the data an event handler *might* need. +//! +//! The stored closure takes care of deserializing the event which the +//! `EventHandlerData` contains as a (borrowed) [`serde_json::value::RawValue`], +//! extracing the context arguments from other fields of `EventHandlerData` and +//! calling / `.await`ing the event handler if the previous steps succeeded. +//! It also logs any errors from the above chain of function calls. +//! +//! For more details, see the [`EventHandler`] trait. + +use std::{borrow::Cow, future::Future, ops::Deref}; + +use matrix_sdk_base::deserialized_responses::SyncRoomEvent; +use ruma::{events::AnySyncStateEvent, serde::Raw}; +use serde::Deserialize; +use serde_json::value::RawValue as RawJsonValue; + +use crate::{room, Client}; + +#[doc(hidden)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum EventKind { + GlobalAccountData, + RoomAccountData, + EphemeralRoomData, + Message { redacted: bool }, + State { redacted: bool }, + StrippedState { redacted: bool }, + InitialState, + ToDevice, + Presence, +} + +/// A statically-known event kind/type that can be retrieved from an event sync. +pub trait SyncEvent { + #[doc(hidden)] + const ID: (EventKind, &'static str); +} + +/// Interface for event handlers. +/// +/// This trait is an abstraction for a certain kind of functions / closures, +/// specifically: +/// +/// * They must have at least one argument, which is the event itself, a type +/// that implements [`SyncEvent`]. Any additional arguments need to implement +/// the [`EventHandlerContext`] trait. +/// * Their return type has to be one of: `()`, `Result<(), impl +/// std::error::Error>` or `anyhow::Result<()>` (requires the `anyhow` Cargo +/// feature to be enabled) +/// +/// ### How it works +/// +/// This trait is basically a very constrained version of `Fn`: It requires at +/// least one argument, which is represented as its own generic parameter `Ev` +/// with the remaining parameter types being represented by the second generic +/// parameter `Ctx`; they have to be stuffed into one generic parameter as a +/// tuple because Rust doesn't have variadic generics. +/// +/// `Ev` and `Ctx` are generic parameters rather than associated types because +/// the argument list is a generic parameter for the `Fn` traits too, so a +/// single type could implement `Fn` multiple times with different argument +/// listsΒΉ. Luckily, when calling [`Client::register_event_handler`] with a +/// closure argument the trait solver takes into account that only a single one +/// of the implementations applies (even though this could theoretically change +/// through a dependency upgrade) and uses that rather than raising an ambiguity +/// error. This is the same trick used by web frameworks like actix-web and +/// axum. +/// +/// ΒΉ the only thing stopping such types from existing in stable Rust is that +/// all manual implementations of the `Fn` traits require a Nightly feature +pub trait EventHandler: Clone + Send + Sync + 'static { + /// The future returned by `handle_event`. + #[doc(hidden)] + type Future: Future + Send + 'static; + + /// The event type being handled, for example a message event of type + /// `m.room.message`. + #[doc(hidden)] + const ID: (EventKind, &'static str); + + /// Create a future for handling the given event. + /// + /// `data` provides additional data about the event, for example the room it + /// appeared in. + /// + /// Returns `None` if one of the context extractors failed. + #[doc(hidden)] + fn handle_event(&self, ev: Ev, data: EventHandlerData<'_>) -> Option; +} + +#[doc(hidden)] +#[derive(Debug)] +pub struct EventHandlerData<'a> { + pub client: Client, + pub room: Option, + pub raw: &'a RawJsonValue, +} + +/// Context for an event handler. +/// +/// This trait defines the set of types that may be used as additional arguments +/// in event handler functions after the event itself. +pub trait EventHandlerContext: Sized { + #[doc(hidden)] + fn from_data(_: &EventHandlerData<'_>) -> Option; +} + +impl EventHandlerContext for Client { + fn from_data(data: &EventHandlerData<'_>) -> Option { + Some(data.client.clone()) + } +} + +/// This event handler context argument is only applicable to room-specific +/// events. +/// +/// Trying to use it in the event handler for another event, for example a +/// global account data or presence event, will result in the event handler +/// being skipped and an error getting logged. +impl EventHandlerContext for room::Room { + fn from_data(data: &EventHandlerData<'_>) -> Option { + data.room.clone() + } +} + +/// The raw JSON form of an event. +/// +/// Used as a context argument for event handlers (see +/// [`Client::register_event_handler`]). +// FIXME: This could be made to not own the raw JSON value with some changes to +// the traits above, but only with GATs. +#[derive(Clone, Debug)] +pub struct RawEvent(pub Box); + +impl Deref for RawEvent { + type Target = RawJsonValue; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl EventHandlerContext for RawEvent { + fn from_data(data: &EventHandlerData<'_>) -> Option { + Some(Self(data.raw.to_owned())) + } +} + +/// Return types supported for event handlers implement this trait. +/// +/// It is not meant to be implemented outside of matrix-sdk. +pub trait EventHandlerResult: Sized { + #[doc(hidden)] + fn print_error(&self, event_type: &str); +} + +impl EventHandlerResult for () { + fn print_error(&self, _event_type: &str) {} +} + +impl EventHandlerResult for Result<(), E> { + fn print_error(&self, event_type: &str) { + if let Err(e) = self { + tracing::error!("Event handler for `{}` failed: {}", event_type, e); + } + } +} + +#[cfg(feature = "anyhow")] +impl EventHandlerResult for anyhow::Result<()> { + fn print_error(&self, event_type: &str) { + if let Err(e) = self { + tracing::error!("Event handler for `{}` failed: {:?}", event_type, e); + } + } +} + +#[derive(Deserialize)] +struct UnsignedDetails { + redacted_because: Option, +} + +/// Event handling internals. +impl Client { + pub(crate) async fn handle_sync_events( + &self, + kind: EventKind, + room: &Option, + events: &[Raw], + ) -> serde_json::Result<()> { + self.handle_sync_events_wrapped(kind, room, events, |x| x).await + } + + pub(crate) async fn handle_sync_state_events( + &self, + room: &Option, + state_events: &[Raw], + ) -> serde_json::Result<()> { + #[derive(Deserialize)] + struct StateEventDetails<'a> { + #[serde(borrow, rename = "type")] + event_type: Cow<'a, str>, + unsigned: Option, + } + + self.handle_sync_events_wrapped_with(room, state_events, std::convert::identity, |raw| { + let StateEventDetails { event_type, unsigned } = raw.deserialize_as()?; + let redacted = unsigned.and_then(|u| u.redacted_because).is_some(); + Ok((EventKind::State { redacted }, event_type)) + }) + .await + } + + pub(crate) async fn handle_sync_timeline_events( + &self, + room: &Option, + timeline_events: &[SyncRoomEvent], + ) -> serde_json::Result<()> { + // FIXME: add EncryptionInfo to context + #[derive(Deserialize)] + struct TimelineEventDetails<'a> { + #[serde(borrow, rename = "type")] + event_type: Cow<'a, str>, + state_key: Option, + unsigned: Option, + } + + self.handle_sync_events_wrapped_with( + room, + timeline_events, + |e| &e.event, + |raw| { + let TimelineEventDetails { event_type, state_key, unsigned } = + raw.deserialize_as()?; + + let redacted = unsigned.and_then(|u| u.redacted_because).is_some(); + let kind = match state_key { + Some(_) => EventKind::State { redacted }, + None => EventKind::Message { redacted }, + }; + + Ok((kind, event_type)) + }, + ) + .await + } + + async fn handle_sync_events_wrapped<'a, T: 'a, U: 'a>( + &self, + kind: EventKind, + room: &Option, + events: &'a [U], + get_event: impl Fn(&'a U) -> &'a Raw, + ) -> Result<(), serde_json::Error> { + #[derive(Deserialize)] + struct ExtractType<'a> { + #[serde(borrow, rename = "type")] + event_type: Cow<'a, str>, + } + + self.handle_sync_events_wrapped_with(room, events, get_event, |raw| { + Ok((kind, raw.deserialize_as::()?.event_type)) + }) + .await + } + + async fn handle_sync_events_wrapped_with<'a, T: 'a, U: 'a>( + &self, + room: &Option, + list: &'a [U], + get_event: impl Fn(&'a U) -> &'a Raw, + get_id: impl Fn(&Raw) -> serde_json::Result<(EventKind, Cow<'_, str>)>, + ) -> serde_json::Result<()> { + for x in list { + let event = get_event(x); + let (ev_kind, ev_type) = get_id(event)?; + let event_handler_id = (ev_kind, &*ev_type); + + if let Some(handlers) = self.event_handlers.read().await.get(&event_handler_id) { + for handler in &*handlers { + let data = EventHandlerData { + client: self.clone(), + room: room.clone(), + raw: event.json(), + }; + matrix_sdk_common::executor::spawn((handler)(data)); + } + } + } + + Ok(()) + } +} + +macro_rules! impl_event_handler { + ($($ty:ident),* $(,)?) => { + impl EventHandler for Fun + where + Ev: SyncEvent, + Fun: Fn(Ev, $($ty),*) -> Fut + Clone + Send + Sync + 'static, + Fut: Future + Send + 'static, + Fut::Output: EventHandlerResult, + $($ty: EventHandlerContext),* + { + type Future = Fut; + const ID: (EventKind, &'static str) = Ev::ID; + + fn handle_event(&self, ev: Ev, _d: EventHandlerData<'_>) -> Option { + Some((self)(ev, $($ty::from_data(&_d)?),*)) + } + } + }; +} + +impl_event_handler!(); +impl_event_handler!(A); +impl_event_handler!(A, B); +impl_event_handler!(A, B, C); +impl_event_handler!(A, B, C, D); +impl_event_handler!(A, B, C, D, E); +impl_event_handler!(A, B, C, D, E, F); +impl_event_handler!(A, B, C, D, E, F, G); +impl_event_handler!(A, B, C, D, E, F, G, H); + +mod static_events { + use ruma::events::{ + self, + presence::{PresenceEvent, PresenceEventContent}, + StaticEventContent, + }; + + use super::{EventKind, SyncEvent}; + + impl SyncEvent for events::GlobalAccountDataEvent + where + C: StaticEventContent + events::GlobalAccountDataEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::GlobalAccountData, C::TYPE); + } + + impl SyncEvent for events::RoomAccountDataEvent + where + C: StaticEventContent + events::RoomAccountDataEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::RoomAccountData, C::TYPE); + } + + impl SyncEvent for events::SyncEphemeralRoomEvent + where + C: StaticEventContent + events::EphemeralRoomEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::EphemeralRoomData, C::TYPE); + } + + impl SyncEvent for events::SyncMessageEvent + where + C: StaticEventContent + events::MessageEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::Message { redacted: false }, C::TYPE); + } + + impl SyncEvent for events::SyncStateEvent + where + C: StaticEventContent + events::StateEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::State { redacted: false }, C::TYPE); + } + + impl SyncEvent for events::StrippedStateEvent + where + C: StaticEventContent + events::StateEventContent, + { + const ID: (EventKind, &'static str) = + (EventKind::StrippedState { redacted: false }, C::TYPE); + } + + impl SyncEvent for events::InitialStateEvent + where + C: StaticEventContent + events::StateEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::InitialState, C::TYPE); + } + + impl SyncEvent for events::ToDeviceEvent + where + C: StaticEventContent + events::ToDeviceEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::ToDevice, C::TYPE); + } + + impl SyncEvent for PresenceEvent { + const ID: (EventKind, &'static str) = (EventKind::Presence, PresenceEventContent::TYPE); + } + + impl SyncEvent for events::RedactedSyncMessageEvent + where + C: StaticEventContent + events::RedactedMessageEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::Message { redacted: true }, C::TYPE); + } + + impl SyncEvent for events::RedactedSyncStateEvent + where + C: StaticEventContent + events::RedactedStateEventContent, + { + const ID: (EventKind, &'static str) = (EventKind::State { redacted: true }, C::TYPE); + } + + impl SyncEvent for events::RedactedStrippedStateEvent + where + C: StaticEventContent + events::RedactedStateEventContent, + { + const ID: (EventKind, &'static str) = + (EventKind::StrippedState { redacted: true }, C::TYPE); + } +} diff --git a/matrix_sdk/src/event_handler/mod.rs b/matrix_sdk/src/event_handler/mod.rs deleted file mode 100644 index e7cf2105..00000000 --- a/matrix_sdk/src/event_handler/mod.rs +++ /dev/null @@ -1,964 +0,0 @@ -// Copyright 2020 Damir JeliΔ‡ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -use std::ops::Deref; - -use matrix_sdk_base::{hoist_and_deserialize_state_event, hoist_room_event_prev_content}; -use matrix_sdk_common::async_trait; -use ruma::{ - api::client::r0::push::get_notifications::Notification, - events::{ - call::{ - answer::AnswerEventContent, candidates::CandidatesEventContent, - hangup::HangupEventContent, invite::InviteEventContent, - }, - custom::CustomEventContent, - fully_read::FullyReadEventContent, - ignored_user_list::IgnoredUserListEventContent, - presence::PresenceEvent, - push_rules::PushRulesEventContent, - reaction::ReactionEventContent, - receipt::ReceiptEventContent, - room::{ - aliases::AliasesEventContent, - avatar::AvatarEventContent, - canonical_alias::CanonicalAliasEventContent, - join_rules::JoinRulesEventContent, - member::MemberEventContent, - message::{feedback::FeedbackEventContent, MessageEventContent as MsgEventContent}, - name::NameEventContent, - power_levels::PowerLevelsEventContent, - redaction::SyncRedactionEvent, - tombstone::TombstoneEventContent, - }, - typing::TypingEventContent, - AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, - AnySyncEphemeralRoomEvent, AnySyncMessageEvent, AnySyncRoomEvent, AnySyncStateEvent, - GlobalAccountDataEvent, RoomAccountDataEvent, StrippedStateEvent, SyncEphemeralRoomEvent, - SyncMessageEvent, SyncStateEvent, - }, - serde::Raw, - RoomId, -}; -use serde_json::value::RawValue as RawJsonValue; - -use crate::{deserialized_responses::SyncResponse, room::Room, Client}; - -pub(crate) struct Handler { - pub(crate) inner: Box, - pub(crate) client: Client, -} - -impl Deref for Handler { - type Target = dyn EventHandler; - - fn deref(&self) -> &Self::Target { - &*self.inner - } -} - -impl Handler { - fn get_room(&self, room_id: &RoomId) -> Option { - self.client.get_room(room_id) - } - - pub(crate) async fn handle_sync(&self, response: &SyncResponse) { - for event in response.account_data.events.iter().filter_map(|e| e.deserialize().ok()) { - self.handle_account_data_event(&event).await; - } - - for (room_id, room_info) in &response.rooms.join { - if let Some(room) = self.get_room(room_id) { - for event in room_info.ephemeral.events.iter().filter_map(|e| e.deserialize().ok()) - { - self.handle_ephemeral_event(room.clone(), &event).await; - } - - for event in - room_info.account_data.events.iter().filter_map(|e| e.deserialize().ok()) - { - self.handle_room_account_data_event(room.clone(), &event).await; - } - - for (raw_event, event) in room_info.state.events.iter().filter_map(|e| { - if let Ok(d) = hoist_and_deserialize_state_event(e) { - Some((e, d)) - } else { - None - } - }) { - self.handle_state_event(room.clone(), &event, raw_event).await; - } - - for (raw_event, event) in room_info.timeline.events.iter().filter_map(|e| { - if let Ok(d) = hoist_room_event_prev_content(&e.event) { - Some((&e.event, d)) - } else { - None - } - }) { - self.handle_timeline_event(room.clone(), &event, raw_event).await; - } - } - } - - for (room_id, room_info) in &response.rooms.leave { - if let Some(room) = self.get_room(room_id) { - for event in - room_info.account_data.events.iter().filter_map(|e| e.deserialize().ok()) - { - self.handle_room_account_data_event(room.clone(), &event).await; - } - - for (raw_event, event) in room_info.state.events.iter().filter_map(|e| { - if let Ok(d) = hoist_and_deserialize_state_event(e) { - Some((e, d)) - } else { - None - } - }) { - self.handle_state_event(room.clone(), &event, raw_event).await; - } - - for (raw_event, event) in room_info.timeline.events.iter().filter_map(|e| { - if let Ok(d) = hoist_room_event_prev_content(&e.event) { - Some((&e.event, d)) - } else { - None - } - }) { - self.handle_timeline_event(room.clone(), &event, raw_event).await; - } - } - } - - for (room_id, room_info) in &response.rooms.invite { - if let Some(room) = self.get_room(room_id) { - for event in - room_info.invite_state.events.iter().filter_map(|e| e.deserialize().ok()) - { - self.handle_stripped_state_event(room.clone(), &event).await; - } - } - } - - for event in response.presence.events.iter().filter_map(|e| e.deserialize().ok()) { - self.on_presence_event(&event).await; - } - - for (room_id, notifications) in &response.notifications { - if let Some(room) = self.get_room(room_id) { - for notification in notifications { - self.on_room_notification(room.clone(), notification.clone()).await; - } - } - } - } - - async fn handle_timeline_event( - &self, - room: Room, - event: &AnySyncRoomEvent, - raw_event: &Raw, - ) { - match event { - AnySyncRoomEvent::State(event) => match event { - AnySyncStateEvent::RoomMember(e) => self.on_room_member(room, e).await, - AnySyncStateEvent::RoomName(e) => self.on_room_name(room, e).await, - AnySyncStateEvent::RoomCanonicalAlias(e) => { - self.on_room_canonical_alias(room, e).await - } - AnySyncStateEvent::RoomAliases(e) => self.on_room_aliases(room, e).await, - AnySyncStateEvent::RoomAvatar(e) => self.on_room_avatar(room, e).await, - AnySyncStateEvent::RoomPowerLevels(e) => self.on_room_power_levels(room, e).await, - AnySyncStateEvent::RoomTombstone(e) => self.on_room_tombstone(room, e).await, - AnySyncStateEvent::RoomJoinRules(e) => self.on_room_join_rules(room, e).await, - AnySyncStateEvent::PolicyRuleRoom(_) - | AnySyncStateEvent::PolicyRuleServer(_) - | AnySyncStateEvent::PolicyRuleUser(_) - | AnySyncStateEvent::RoomCreate(_) - | AnySyncStateEvent::RoomEncryption(_) - | AnySyncStateEvent::RoomGuestAccess(_) - | AnySyncStateEvent::RoomHistoryVisibility(_) - | AnySyncStateEvent::RoomPinnedEvents(_) - | AnySyncStateEvent::RoomServerAcl(_) - | AnySyncStateEvent::RoomThirdPartyInvite(_) - | AnySyncStateEvent::RoomTopic(_) - | AnySyncStateEvent::SpaceChild(_) - | AnySyncStateEvent::SpaceParent(_) => {} - _ => { - if let Ok(e) = raw_event.deserialize_as::>() - { - self.on_custom_event(room, &CustomEvent::State(&e)).await; - } - } - }, - AnySyncRoomEvent::Message(event) => match event { - AnySyncMessageEvent::RoomMessage(e) => self.on_room_message(room, e).await, - AnySyncMessageEvent::RoomMessageFeedback(e) => { - self.on_room_message_feedback(room, e).await - } - AnySyncMessageEvent::RoomRedaction(e) => self.on_room_redaction(room, e).await, - AnySyncMessageEvent::Reaction(e) => self.on_room_reaction(room, e).await, - AnySyncMessageEvent::CallInvite(e) => self.on_room_call_invite(room, e).await, - AnySyncMessageEvent::CallAnswer(e) => self.on_room_call_answer(room, e).await, - AnySyncMessageEvent::CallCandidates(e) => { - self.on_room_call_candidates(room, e).await - } - AnySyncMessageEvent::CallHangup(e) => self.on_room_call_hangup(room, e).await, - AnySyncMessageEvent::KeyVerificationReady(_) - | AnySyncMessageEvent::KeyVerificationStart(_) - | AnySyncMessageEvent::KeyVerificationCancel(_) - | AnySyncMessageEvent::KeyVerificationAccept(_) - | AnySyncMessageEvent::KeyVerificationKey(_) - | AnySyncMessageEvent::KeyVerificationMac(_) - | AnySyncMessageEvent::KeyVerificationDone(_) - | AnySyncMessageEvent::RoomEncrypted(_) - | AnySyncMessageEvent::Sticker(_) => {} - _ => { - if let Ok(e) = - raw_event.deserialize_as::>() - { - self.on_custom_event(room, &CustomEvent::Message(&e)).await; - } - } - }, - AnySyncRoomEvent::RedactedState(_event) => {} - AnySyncRoomEvent::RedactedMessage(_event) => {} - } - } - - async fn handle_state_event( - &self, - room: Room, - event: &AnySyncStateEvent, - raw_event: &Raw, - ) { - match event { - AnySyncStateEvent::RoomMember(member) => self.on_state_member(room, member).await, - AnySyncStateEvent::RoomName(name) => self.on_state_name(room, name).await, - AnySyncStateEvent::RoomCanonicalAlias(canonical) => { - self.on_state_canonical_alias(room, canonical).await - } - AnySyncStateEvent::RoomAliases(aliases) => self.on_state_aliases(room, aliases).await, - AnySyncStateEvent::RoomAvatar(avatar) => self.on_state_avatar(room, avatar).await, - AnySyncStateEvent::RoomPowerLevels(power) => { - self.on_state_power_levels(room, power).await - } - AnySyncStateEvent::RoomJoinRules(rules) => self.on_state_join_rules(room, rules).await, - AnySyncStateEvent::RoomTombstone(tomb) => { - // TODO make `on_state_tombstone` method - self.on_room_tombstone(room, tomb).await - } - AnySyncStateEvent::PolicyRuleRoom(_) - | AnySyncStateEvent::PolicyRuleServer(_) - | AnySyncStateEvent::PolicyRuleUser(_) - | AnySyncStateEvent::RoomCreate(_) - | AnySyncStateEvent::RoomEncryption(_) - | AnySyncStateEvent::RoomGuestAccess(_) - | AnySyncStateEvent::RoomHistoryVisibility(_) - | AnySyncStateEvent::RoomPinnedEvents(_) - | AnySyncStateEvent::RoomServerAcl(_) - | AnySyncStateEvent::RoomThirdPartyInvite(_) - | AnySyncStateEvent::RoomTopic(_) - | AnySyncStateEvent::SpaceChild(_) - | AnySyncStateEvent::SpaceParent(_) => {} - _ => { - if let Ok(e) = raw_event.deserialize_as::>() { - self.on_custom_event(room, &CustomEvent::State(&e)).await; - } - } - } - } - - pub(crate) async fn handle_stripped_state_event( - &self, - // TODO these events are only handled in invited rooms. - room: Room, - event: &AnyStrippedStateEvent, - ) { - match event { - AnyStrippedStateEvent::RoomMember(member) => { - self.on_stripped_state_member(room, member, None).await - } - AnyStrippedStateEvent::RoomName(name) => self.on_stripped_state_name(room, name).await, - AnyStrippedStateEvent::RoomCanonicalAlias(canonical) => { - self.on_stripped_state_canonical_alias(room, canonical).await - } - AnyStrippedStateEvent::RoomAliases(aliases) => { - self.on_stripped_state_aliases(room, aliases).await - } - AnyStrippedStateEvent::RoomAvatar(avatar) => { - self.on_stripped_state_avatar(room, avatar).await - } - AnyStrippedStateEvent::RoomPowerLevels(power) => { - self.on_stripped_state_power_levels(room, power).await - } - AnyStrippedStateEvent::RoomJoinRules(rules) => { - self.on_stripped_state_join_rules(room, rules).await - } - _ => {} - } - } - - pub(crate) async fn handle_room_account_data_event( - &self, - room: Room, - event: &AnyRoomAccountDataEvent, - ) { - if let AnyRoomAccountDataEvent::FullyRead(event) = event { - self.on_non_room_fully_read(room, event).await - } - } - - pub(crate) async fn handle_account_data_event(&self, event: &AnyGlobalAccountDataEvent) { - match event { - AnyGlobalAccountDataEvent::IgnoredUserList(ignored) => { - self.on_non_room_ignored_users(ignored).await - } - AnyGlobalAccountDataEvent::PushRules(rules) => self.on_non_room_push_rules(rules).await, - _ => {} - } - } - - pub(crate) async fn handle_ephemeral_event( - &self, - room: Room, - event: &AnySyncEphemeralRoomEvent, - ) { - match event { - AnySyncEphemeralRoomEvent::Typing(typing) => { - self.on_non_room_typing(room, typing).await - } - AnySyncEphemeralRoomEvent::Receipt(receipt) => { - self.on_non_room_receipt(room, receipt).await - } - _ => {} - } - } -} - -/// This represents the various "unrecognized" events. -#[derive(Clone, Copy, Debug)] -pub enum CustomEvent<'c> { - /// A custom basic event. - Basic(&'c GlobalAccountDataEvent), - /// A custom basic event. - EphemeralRoom(&'c SyncEphemeralRoomEvent), - /// A custom room event. - Message(&'c SyncMessageEvent), - /// A custom state event. - State(&'c SyncStateEvent), - /// A custom stripped state event. - StrippedState(&'c StrippedStateEvent), -} - -/// This trait allows any type implementing `EventHandler` to specify event -/// callbacks for each event. The `Client` calls each method when the -/// corresponding event is received. -/// -/// # Examples -/// ``` -/// # use std::ops::Deref; -/// # use std::sync::Arc; -/// # use std::{env, process::exit}; -/// # use matrix_sdk::{ -/// # async_trait, -/// # EventHandler, -/// # ruma::events::{ -/// # room::message::{MessageEventContent, MessageType, TextMessageEventContent}, -/// # SyncMessageEvent -/// # }, -/// # locks::RwLock, -/// # room::Room, -/// # }; -/// -/// struct EventCallback; -/// -/// #[async_trait] -/// impl EventHandler for EventCallback { -/// async fn on_room_message(&self, room: Room, event: &SyncMessageEvent) { -/// if let Room::Joined(room) = room { -/// if let SyncMessageEvent { -/// content: -/// MessageEventContent { -/// msgtype: MessageType::Text(TextMessageEventContent { body: msg_body, .. }), -/// .. -/// }, -/// sender, -/// .. -/// } = event -/// { -/// let member = room.get_member(&sender).await.unwrap().unwrap(); -/// let name = member -/// .display_name() -/// .unwrap_or_else(|| member.user_id().as_str()); -/// println!("{}: {}", name, msg_body); -/// } -/// } -/// } -/// } -/// ``` -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -pub trait EventHandler: Send + Sync { - // ROOM EVENTS from `IncomingTimeline` - /// Fires when `Client` receives a `RoomEvent::RoomMember` event. - async fn on_room_member(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomName` event. - async fn on_room_name(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomCanonicalAlias` event. - async fn on_room_canonical_alias( - &self, - _: Room, - _: &SyncStateEvent, - ) { - } - /// Fires when `Client` receives a `RoomEvent::RoomAliases` event. - async fn on_room_aliases(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomAvatar` event. - async fn on_room_avatar(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomMessage` event. - async fn on_room_message(&self, _: Room, _: &SyncMessageEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomMessageFeedback` event. - async fn on_room_message_feedback(&self, _: Room, _: &SyncMessageEvent) {} - /// Fires when `Client` receives a `RoomEvent::Reaction` event. - async fn on_room_reaction(&self, _: Room, _: &SyncMessageEvent) {} - /// Fires when `Client` receives a `RoomEvent::CallInvite` event - async fn on_room_call_invite(&self, _: Room, _: &SyncMessageEvent) {} - /// Fires when `Client` receives a `RoomEvent::CallAnswer` event - async fn on_room_call_answer(&self, _: Room, _: &SyncMessageEvent) {} - /// Fires when `Client` receives a `RoomEvent::CallCandidates` event - async fn on_room_call_candidates(&self, _: Room, _: &SyncMessageEvent) { - } - /// Fires when `Client` receives a `RoomEvent::CallHangup` event - async fn on_room_call_hangup(&self, _: Room, _: &SyncMessageEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomRedaction` event. - async fn on_room_redaction(&self, _: Room, _: &SyncRedactionEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomPowerLevels` event. - async fn on_room_power_levels(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `RoomEvent::RoomJoinRules` event. - async fn on_room_join_rules(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `RoomEvent::Tombstone` event. - async fn on_room_tombstone(&self, _: Room, _: &SyncStateEvent) {} - - /// Fires when `Client` receives room events that trigger notifications - /// according to the push rules of the user. - async fn on_room_notification(&self, _: Room, _: Notification) {} - - // `RoomEvent`s from `IncomingState` - /// Fires when `Client` receives a `StateEvent::RoomMember` event. - async fn on_state_member(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `StateEvent::RoomName` event. - async fn on_state_name(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `StateEvent::RoomCanonicalAlias` event. - async fn on_state_canonical_alias( - &self, - _: Room, - _: &SyncStateEvent, - ) { - } - /// Fires when `Client` receives a `StateEvent::RoomAliases` event. - async fn on_state_aliases(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `StateEvent::RoomAvatar` event. - async fn on_state_avatar(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `StateEvent::RoomPowerLevels` event. - async fn on_state_power_levels(&self, _: Room, _: &SyncStateEvent) {} - /// Fires when `Client` receives a `StateEvent::RoomJoinRules` event. - async fn on_state_join_rules(&self, _: Room, _: &SyncStateEvent) {} - - // `AnyStrippedStateEvent`s - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomMember` event. - async fn on_stripped_state_member( - &self, - _: Room, - _: &StrippedStateEvent, - _: Option, - ) { - } - /// Fires when `Client` receives a `AnyStrippedStateEvent::StrippedRoomName` - /// event. - async fn on_stripped_state_name(&self, _: Room, _: &StrippedStateEvent) {} - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomCanonicalAlias` event. - async fn on_stripped_state_canonical_alias( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomAliases` event. - async fn on_stripped_state_aliases( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomAvatar` event. - async fn on_stripped_state_avatar(&self, _: Room, _: &StrippedStateEvent) {} - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomPowerLevels` event. - async fn on_stripped_state_power_levels( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomJoinRules` event. - async fn on_stripped_state_join_rules( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - } - - // `NonRoomEvent` (this is a type alias from ruma_events) - /// Fires when `Client` receives a `NonRoomEvent::RoomPresence` event. - async fn on_non_room_presence(&self, _: Room, _: &PresenceEvent) {} - /// Fires when `Client` receives a `NonRoomEvent::RoomName` event. - async fn on_non_room_ignored_users( - &self, - _: &GlobalAccountDataEvent, - ) { - } - /// Fires when `Client` receives a `NonRoomEvent::RoomCanonicalAlias` event. - async fn on_non_room_push_rules(&self, _: &GlobalAccountDataEvent) {} - /// Fires when `Client` receives a `NonRoomEvent::RoomAliases` event. - async fn on_non_room_fully_read( - &self, - _: Room, - _: &RoomAccountDataEvent, - ) { - } - /// Fires when `Client` receives a `NonRoomEvent::Typing` event. - async fn on_non_room_typing(&self, _: Room, _: &SyncEphemeralRoomEvent) {} - /// Fires when `Client` receives a `NonRoomEvent::Receipt` event. - /// - /// This is always a read receipt. - async fn on_non_room_receipt(&self, _: Room, _: &SyncEphemeralRoomEvent) {} - - // `PresenceEvent` is a struct so there is only the one method - /// Fires when `Client` receives a `NonRoomEvent::RoomAliases` event. - async fn on_presence_event(&self, _: &PresenceEvent) {} - - /// Fires when `Client` receives a `Event::Custom` event or if - /// deserialization fails because the event was unknown to ruma. - /// - /// The only guarantee this method can give about the event is that it is - /// valid JSON. - async fn on_unrecognized_event(&self, _: Room, _: &RawJsonValue) {} - - /// Fires when `Client` receives a `Event::Custom` event or if - /// deserialization fails because the event was unknown to ruma. - /// - /// The only guarantee this method can give about the event is that it is in - /// the shape of a valid matrix event. - async fn on_custom_event(&self, _: Room, _: &CustomEvent<'_>) {} -} - -#[cfg(test)] -mod test { - use std::{sync::Arc, time::Duration}; - - use matrix_sdk_common::{async_trait, locks::Mutex}; - use matrix_sdk_test::{async_test, test_json}; - use mockito::{mock, Matcher}; - use ruma::user_id; - #[cfg(target_arch = "wasm32")] - pub use wasm_bindgen_test::*; - - use super::*; - - #[derive(Clone)] - pub struct EvHandlerTest(Arc>>); - - #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] - #[cfg_attr(not(target_arch = "wasm32"), async_trait)] - impl EventHandler for EvHandlerTest { - async fn on_room_member(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("member".to_string()) - } - async fn on_room_name(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("name".to_string()) - } - async fn on_room_canonical_alias( - &self, - _: Room, - _: &SyncStateEvent, - ) { - self.0.lock().await.push("canonical".to_string()) - } - async fn on_room_aliases(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("aliases".to_string()) - } - async fn on_room_avatar(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("avatar".to_string()) - } - async fn on_room_message(&self, _: Room, _: &SyncMessageEvent) { - self.0.lock().await.push("message".to_string()) - } - async fn on_room_message_feedback( - &self, - _: Room, - _: &SyncMessageEvent, - ) { - self.0.lock().await.push("feedback".to_string()) - } - async fn on_room_call_invite(&self, _: Room, _: &SyncMessageEvent) { - self.0.lock().await.push("call invite".to_string()) - } - async fn on_room_call_answer(&self, _: Room, _: &SyncMessageEvent) { - self.0.lock().await.push("call answer".to_string()) - } - async fn on_room_call_candidates( - &self, - _: Room, - _: &SyncMessageEvent, - ) { - self.0.lock().await.push("call candidates".to_string()) - } - async fn on_room_call_hangup(&self, _: Room, _: &SyncMessageEvent) { - self.0.lock().await.push("call hangup".to_string()) - } - async fn on_room_redaction(&self, _: Room, _: &SyncRedactionEvent) { - self.0.lock().await.push("redaction".to_string()) - } - async fn on_room_power_levels(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("power".to_string()) - } - async fn on_room_tombstone(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("tombstone".to_string()) - } - - async fn on_state_member(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("state member".to_string()) - } - async fn on_state_name(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("state name".to_string()) - } - async fn on_state_canonical_alias( - &self, - _: Room, - _: &SyncStateEvent, - ) { - self.0.lock().await.push("state canonical".to_string()) - } - async fn on_state_aliases(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("state aliases".to_string()) - } - async fn on_state_avatar(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("state avatar".to_string()) - } - async fn on_state_power_levels( - &self, - _: Room, - _: &SyncStateEvent, - ) { - self.0.lock().await.push("state power".to_string()) - } - async fn on_state_join_rules(&self, _: Room, _: &SyncStateEvent) { - self.0.lock().await.push("state rules".to_string()) - } - - // `AnyStrippedStateEvent`s - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomMember` event. - async fn on_stripped_state_member( - &self, - _: Room, - _: &StrippedStateEvent, - _: Option, - ) { - self.0.lock().await.push("stripped state member".to_string()) - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomName` event. - async fn on_stripped_state_name(&self, _: Room, _: &StrippedStateEvent) { - self.0.lock().await.push("stripped state name".to_string()) - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomCanonicalAlias` event. - async fn on_stripped_state_canonical_alias( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - self.0.lock().await.push("stripped state canonical".to_string()) - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomAliases` event. - async fn on_stripped_state_aliases( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - self.0.lock().await.push("stripped state aliases".to_string()) - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomAvatar` event. - async fn on_stripped_state_avatar( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - self.0.lock().await.push("stripped state avatar".to_string()) - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomPowerLevels` event. - async fn on_stripped_state_power_levels( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - self.0.lock().await.push("stripped state power".to_string()) - } - /// Fires when `Client` receives a - /// `AnyStrippedStateEvent::StrippedRoomJoinRules` event. - async fn on_stripped_state_join_rules( - &self, - _: Room, - _: &StrippedStateEvent, - ) { - self.0.lock().await.push("stripped state rules".to_string()) - } - - async fn on_non_room_presence(&self, _: Room, _: &PresenceEvent) { - self.0.lock().await.push("presence".to_string()) - } - async fn on_non_room_ignored_users( - &self, - _: &GlobalAccountDataEvent, - ) { - self.0.lock().await.push("account ignore".to_string()) - } - async fn on_non_room_push_rules(&self, _: &GlobalAccountDataEvent) { - self.0.lock().await.push("account push rules".to_string()) - } - async fn on_non_room_fully_read( - &self, - _: Room, - _: &RoomAccountDataEvent, - ) { - self.0.lock().await.push("account read".to_string()) - } - async fn on_non_room_typing( - &self, - _: Room, - _: &SyncEphemeralRoomEvent, - ) { - self.0.lock().await.push("typing event".to_string()) - } - async fn on_non_room_receipt( - &self, - _: Room, - _: &SyncEphemeralRoomEvent, - ) { - self.0.lock().await.push("receipt event".to_string()) - } - async fn on_presence_event(&self, _: &PresenceEvent) { - self.0.lock().await.push("presence event".to_string()) - } - async fn on_unrecognized_event(&self, _: Room, _: &RawJsonValue) { - self.0.lock().await.push("unrecognized event".to_string()) - } - async fn on_custom_event(&self, _: Room, _: &CustomEvent<'_>) { - self.0.lock().await.push("custom event".to_string()) - } - async fn on_room_notification(&self, _: Room, _: Notification) { - self.0.lock().await.push("notification".to_string()) - } - } - - use crate::{Client, Session, SyncSettings}; - - async fn get_client() -> Client { - let session = Session { - access_token: "1234".to_owned(), - user_id: user_id!("@example:localhost"), - device_id: "DEVICEID".into(), - }; - let homeserver = url::Url::parse(&mockito::server_url()).unwrap(); - let client = Client::new(homeserver).unwrap(); - client.restore_login(session).await.unwrap(); - client - } - - async fn mock_sync(client: &Client, response: String) { - let _m = mock("GET", Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string())) - .with_status(200) - .match_header("authorization", "Bearer 1234") - .with_body(response) - .create(); - - let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync_once(sync_settings).await.unwrap(); - } - - #[async_test] - async fn event_handler_joined() { - let vec = Arc::new(Mutex::new(Vec::new())); - let test_vec = Arc::clone(&vec); - let handler = Box::new(EvHandlerTest(vec)); - - let client = get_client().await; - client.set_event_handler(handler).await; - mock_sync(&client, test_json::SYNC.to_string()).await; - - let v = test_vec.lock().await; - assert_eq!( - v.as_slice(), - [ - "account ignore", - "receipt event", - "account read", - "state rules", - "state member", - "state aliases", - "state power", - "state canonical", - "state member", - "state member", - "message", - "presence event", - "notification", - ], - ) - } - - #[async_test] - async fn event_handler_invite() { - let vec = Arc::new(Mutex::new(Vec::new())); - let test_vec = Arc::clone(&vec); - let handler = Box::new(EvHandlerTest(vec)); - - let client = get_client().await; - client.set_event_handler(handler).await; - mock_sync(&client, test_json::INVITE_SYNC.to_string()).await; - - let v = test_vec.lock().await; - assert_eq!(v.as_slice(), ["stripped state name", "stripped state member", "presence event"],) - } - - #[async_test] - async fn event_handler_leave() { - let vec = Arc::new(Mutex::new(Vec::new())); - let test_vec = Arc::clone(&vec); - let handler = Box::new(EvHandlerTest(vec)); - - let client = get_client().await; - client.set_event_handler(handler).await; - mock_sync(&client, test_json::LEAVE_SYNC.to_string()).await; - - let v = test_vec.lock().await; - assert_eq!( - v.as_slice(), - [ - "account ignore", - "state rules", - "state member", - "state aliases", - "state power", - "state canonical", - "state member", - "state member", - "message", - "presence event", - "notification", - ], - ) - } - - #[async_test] - async fn event_handler_more_events() { - let vec = Arc::new(Mutex::new(Vec::new())); - let test_vec = Arc::clone(&vec); - let handler = Box::new(EvHandlerTest(vec)); - - let client = get_client().await; - client.set_event_handler(handler).await; - mock_sync(&client, test_json::MORE_SYNC.to_string()).await; - - let v = test_vec.lock().await; - assert_eq!( - v.as_slice(), - [ - "receipt event", - "typing event", - "message", - "message", // this is a message edit event - "redaction", - "message", // this is a notice event - ], - ) - } - - #[async_test] - async fn event_handler_voip() { - let vec = Arc::new(Mutex::new(Vec::new())); - let test_vec = Arc::clone(&vec); - let handler = Box::new(EvHandlerTest(vec)); - - let client = get_client().await; - client.set_event_handler(handler).await; - mock_sync(&client, test_json::VOIP_SYNC.to_string()).await; - - let v = test_vec.lock().await; - assert_eq!(v.as_slice(), ["call invite", "call answer", "call candidates", "call hangup",],) - } - - #[async_test] - async fn event_handler_two_syncs() { - let vec = Arc::new(Mutex::new(Vec::new())); - let test_vec = Arc::clone(&vec); - let handler = Box::new(EvHandlerTest(vec)); - - let client = get_client().await; - client.set_event_handler(handler).await; - mock_sync(&client, test_json::SYNC.to_string()).await; - mock_sync(&client, test_json::MORE_SYNC.to_string()).await; - - let v = test_vec.lock().await; - assert_eq!( - v.as_slice(), - [ - "account ignore", - "receipt event", - "account read", - "state rules", - "state member", - "state aliases", - "state power", - "state canonical", - "state member", - "state member", - "message", - "presence event", - "notification", - "receipt event", - "typing event", - "message", - "message", // this is a message edit event - "redaction", - "message", // this is a notice event - "notification", - "notification", - "notification", - ], - ) - } -} diff --git a/matrix_sdk/src/lib.rs b/matrix_sdk/src/lib.rs index be0c099e..92dffbe1 100644 --- a/matrix_sdk/src/lib.rs +++ b/matrix_sdk/src/lib.rs @@ -39,23 +39,24 @@ //! The following crate feature flags are available: //! //! * `encryption`: Enables end-to-end encryption support in the library. -//! * `sled_cryptostore`: Enables a Sled based store for the encryption -//! keys. If this is disabled and `encryption` support is enabled the keys will -//! by default be stored only in memory and thus lost after the client is -//! destroyed. +//! * `sled_cryptostore`: Enables a Sled based store for the encryption keys. If +//! this is disabled and `encryption` support is enabled the keys will by +//! default be stored only in memory and thus lost after the client is +//! destroyed. //! * `markdown`: Support for sending markdown formatted messages. //! * `socks`: Enables SOCKS support in reqwest, the default HTTP client. //! * `sso_login`: Enables SSO login with a local http server. //! * `require_auth_for_profile_requests`: Whether to send the access token in -//! the authentication -//! header when calling endpoints that retrieve profile data. This matches the -//! synapse configuration `require_auth_for_profile_requests`. Enabled by -//! default. +//! the authentication header when calling endpoints that retrieve profile +//! data. This matches the synapse configuration +//! `require_auth_for_profile_requests`. Enabled by default. //! * `appservice`: Enables low-level appservice functionality. For an //! high-level API there's the `matrix-sdk-appservice` crate +//! * `anyhow`: Support for returning `anyhow::Result<()>` from event handlers. #![deny( missing_debug_implementations, + missing_docs, dead_code, missing_docs, trivial_casts, @@ -90,7 +91,7 @@ pub use ruma; mod client; mod error; -mod event_handler; +pub mod event_handler; mod http_client; /// High-level room API pub mod room; @@ -107,7 +108,6 @@ pub use client::{Client, ClientConfig, LoopCtrl, RequestConfig, SyncSettings}; #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub use device::Device; pub use error::{Error, HttpError, Result}; -pub use event_handler::{CustomEvent, EventHandler}; pub use http_client::HttpSend; pub use room_member::RoomMember; #[cfg(not(target_arch = "wasm32"))] diff --git a/matrix_sdk/src/room/room.rs b/matrix_sdk/src/room/room.rs deleted file mode 100644 index 8b137891..00000000 --- a/matrix_sdk/src/room/room.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/matrix_sdk_appservice/examples/appservice_autojoin.rs b/matrix_sdk_appservice/examples/appservice_autojoin.rs index f792f242..a5b8b9ff 100644 --- a/matrix_sdk_appservice/examples/appservice_autojoin.rs +++ b/matrix_sdk_appservice/examples/appservice_autojoin.rs @@ -2,7 +2,6 @@ use std::{convert::TryFrom, env}; use matrix_sdk_appservice::{ matrix_sdk::{ - async_trait, room::Room, ruma::{ events::{ @@ -11,51 +10,27 @@ use matrix_sdk_appservice::{ }, UserId, }, - EventHandler, }, - AppService, AppServiceRegistration, + AppService, AppServiceRegistration, Result, }; -use tracing::{error, trace}; +use tracing::trace; -struct AppServiceEventHandler { +pub async fn handle_room_member( appservice: AppService, -} + room: Room, + event: SyncStateEvent, +) -> Result<()> { + if !appservice.user_id_is_in_namespace(&event.state_key)? { + trace!("not an appservice user: {}", event.state_key); + } else if let MembershipState::Invite = event.content.membership { + let user_id = UserId::try_from(event.state_key.as_str())?; + appservice.register_virtual_user(user_id.localpart()).await?; -impl AppServiceEventHandler { - pub fn new(appservice: AppService) -> Self { - Self { appservice } + let client = appservice.virtual_user_client(user_id.localpart()).await?; + client.join_room_by_id(room.room_id()).await?; } - pub async fn handle_room_member( - &self, - room: Room, - event: &SyncStateEvent, - ) -> Result<(), Box> { - if !self.appservice.user_id_is_in_namespace(&event.state_key)? { - trace!("not an appservice user: {}", event.state_key); - } else if let MembershipState::Invite = event.content.membership { - let user_id = UserId::try_from(event.state_key.clone())?; - - let appservice = self.appservice.clone(); - appservice.register_virtual_user(user_id.localpart()).await?; - - let client = appservice.virtual_user_client(user_id.localpart()).await?; - - client.join_room_by_id(room.room_id()).await?; - } - - Ok(()) - } -} - -#[async_trait] -impl EventHandler for AppServiceEventHandler { - async fn on_room_member(&self, room: Room, event: &SyncStateEvent) { - match self.handle_room_member(room, event).await { - Ok(_) => (), - Err(error) => error!("{:?}", error), - } - } + Ok(()) } #[tokio::main] @@ -68,7 +43,14 @@ pub async fn main() -> Result<(), Box> { let registration = AppServiceRegistration::try_from_yaml_file("./tests/registration.yaml")?; let mut appservice = AppService::new(homeserver_url, server_name, registration).await?; - appservice.set_event_handler(Box::new(AppServiceEventHandler::new(appservice.clone()))).await?; + appservice + .register_event_handler({ + let appservice = appservice.clone(); + move |event: SyncStateEvent, room: Room| { + handle_room_member(appservice.clone(), room, event) + } + }) + .await?; let (host, port) = appservice.registration().get_host_and_port()?; appservice.run(host, port).await?; diff --git a/matrix_sdk_appservice/src/lib.rs b/matrix_sdk_appservice/src/lib.rs index 71de60ec..037d292e 100644 --- a/matrix_sdk_appservice/src/lib.rs +++ b/matrix_sdk_appservice/src/lib.rs @@ -34,14 +34,10 @@ //! ```no_run //! # async { //! # -//! # use matrix_sdk::{async_trait, EventHandler}; -//! # -//! # struct MyEventHandler; -//! # -//! # #[async_trait] -//! # impl EventHandler for MyEventHandler {} -//! # -//! use matrix_sdk_appservice::{AppService, AppServiceRegistration}; +//! use matrix_sdk_appservice::{ +//! ruma::events::{SyncStateEvent, room::member::MemberEventContent}, +//! AppService, AppServiceRegistration +//! }; //! //! let homeserver_url = "http://127.0.0.1:8008"; //! let server_name = "localhost"; @@ -59,7 +55,9 @@ //! ")?; //! //! let mut appservice = AppService::new(homeserver_url, server_name, registration).await?; -//! appservice.set_event_handler(Box::new(MyEventHandler)).await?; +//! appservice.register_event_handler(|_ev: SyncStateEvent| async { +//! // do stuff +//! }); //! //! let (host, port) = appservice.registration().get_host_and_port()?; //! appservice.run(host, port).await?; @@ -80,6 +78,7 @@ compile_error!("one webserver feature must be enabled. available ones: `warp`"); use std::{ convert::{TryFrom, TryInto}, fs::File, + future::Future, ops::Deref, path::PathBuf, sync::Arc, @@ -92,7 +91,10 @@ pub use matrix_sdk; #[doc(no_inline)] pub use matrix_sdk::ruma; use matrix_sdk::{ - bytes::Bytes, reqwest::Url, Client, ClientConfig, EventHandler, HttpError, Session, + bytes::Bytes, + event_handler::{EventHandler, EventHandlerResult, SyncEvent}, + reqwest::Url, + Client, ClientConfig, HttpError, Session, }; use regex::Regex; use ruma::{ @@ -106,12 +108,13 @@ use ruma::{ }, assign, identifiers, DeviceId, ServerNameBox, UserId, }; +use serde::de::DeserializeOwned; use tracing::{info, warn}; mod error; mod webserver; -pub type Result = std::result::Result; +pub type Result = std::result::Result; pub type Host = String; pub type Port = u16; @@ -354,8 +357,8 @@ impl AppService { Ok(entry.value().clone()) } - /// Convenience wrapper around [`Client::set_event_handler()`] that attaches - /// the event handler to the [`MainUser`]'s [`Client`] + /// Convenience wrapper around [`Client::register_event_handler()`] that + /// attaches the event handler to the [`MainUser`]'s [`Client`] /// /// Note that the event handler in the [`AppService`] context only triggers /// [`join` room `timeline` events], so no state events or events from the @@ -370,10 +373,14 @@ impl AppService { /// /// [`join` room `timeline` events]: https://spec.matrix.org/unstable/client-server-api/#get_matrixclientr0sync /// [MSC2409]: https://github.com/matrix-org/matrix-doc/pull/2409 - pub async fn set_event_handler(&mut self, handler: Box) -> Result<()> { + pub async fn register_event_handler(&mut self, handler: H) -> Result<()> + where + Ev: SyncEvent + DeserializeOwned + Send + 'static, + H: EventHandler, + ::Output: EventHandlerResult, + { let client = self.get_cached_client(None)?; - - client.set_event_handler(handler).await; + client.register_event_handler(handler).await; Ok(()) } diff --git a/matrix_sdk_appservice/tests/tests.rs b/matrix_sdk_appservice/tests/tests.rs index 8174daef..8551c65d 100644 --- a/matrix_sdk_appservice/tests/tests.rs +++ b/matrix_sdk_appservice/tests/tests.rs @@ -1,13 +1,14 @@ -use std::sync::{Arc, Mutex}; +use std::{ + future, + sync::{Arc, Mutex}, +}; use matrix_sdk::{ - async_trait, - room::Room, ruma::{ api::appservice::Registration, events::{room::member::MemberEventContent, SyncStateEvent}, }, - ClientConfig, EventHandler, RequestConfig, + ClientConfig, RequestConfig, }; use matrix_sdk_appservice::*; use matrix_sdk_test::{appservice::TransactionBuilder, async_test, EventsJson}; @@ -203,28 +204,17 @@ async fn test_no_access_token() -> Result<()> { async fn test_event_handler() -> Result<()> { let mut appservice = appservice(None).await?; - #[derive(Clone)] - struct Example { - pub on_state_member: Arc>, - } - - impl Example { - pub fn new() -> Self { - #[allow(clippy::mutex_atomic)] - Self { on_state_member: Arc::new(Mutex::new(false)) } - } - } - - #[async_trait] - impl EventHandler for Example { - async fn on_room_member(&self, _: Room, _: &SyncStateEvent) { - let on_state_member = self.on_state_member.clone(); - *on_state_member.lock().unwrap() = true; - } - } - - let example = Example::new(); - appservice.set_event_handler(Box::new(example.clone())).await?; + #[allow(clippy::mutex_atomic)] + let on_state_member = Arc::new(Mutex::new(false)); + appservice + .register_event_handler({ + let on_state_member = on_state_member.clone(); + move |_ev: SyncStateEvent| { + *on_state_member.lock().unwrap() = true; + future::ready(()) + } + }) + .await?; let uri = "/_matrix/app/v1/transactions/1?access_token=hs_token"; @@ -241,7 +231,7 @@ async fn test_event_handler() -> Result<()> { .await .unwrap(); - let on_room_member_called = *example.on_state_member.lock().unwrap(); + let on_room_member_called = *on_state_member.lock().unwrap(); assert!(on_room_member_called); Ok(()) diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index 399b5348..3b77fd44 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -50,10 +50,9 @@ use ruma::{ use ruma::{ api::client::r0::{self as api, push::get_notifications::Notification}, events::{ - room::member::{MemberEventContent, MembershipState}, - AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, - AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType, - StateEvent, + room::member::MembershipState, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, + AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent, + EventContent, EventType, }, push::{Action, PushConditionRoomCtx, Ruleset}, serde::Raw, @@ -71,97 +70,6 @@ use crate::{ pub type Token = String; -/// A deserialization wrapper for extracting the prev_content field when -/// found in an `unsigned` field. -/// -/// Represents the outer `unsigned` field -#[derive(serde::Deserialize)] -pub struct AdditionalEventData { - unsigned: AdditionalUnsignedData, -} - -/// A deserialization wrapper for extracting the prev_content field when -/// found in an `unsigned` field. -/// -/// Represents the inner `prev_content` field -#[derive(serde::Deserialize)] -pub struct AdditionalUnsignedData { - pub prev_content: Option>, -} - -/// Transform an `AnySyncStateEvent` by hoisting `prev_content` field from -/// `unsigned` to the top level. -/// -/// Due to a [bug in synapse][synapse-bug], `prev_content` often ends up in -/// `unsigned` contrary to the C2S spec. Some more discussion can be found -/// [here][discussion]. Until this is fixed in synapse or handled in Ruma, we -/// use this to hoist up `prev_content` to the top level. -/// -/// [synapse-bug]: -/// [discussion]: -pub fn hoist_and_deserialize_state_event( - event: &Raw, -) -> StdResult { - let prev_content = event.deserialize_as::()?.unsigned.prev_content; - - let mut ev = event.deserialize()?; - - if let AnySyncStateEvent::RoomMember(ref mut member) = ev { - if member.prev_content.is_none() { - member.prev_content = prev_content.and_then(|e| e.deserialize().ok()); - } - } - - Ok(ev) -} - -fn hoist_member_event( - event: &Raw>, -) -> StdResult, serde_json::Error> { - let prev_content = event.deserialize_as::()?.unsigned.prev_content; - - let mut e = event.deserialize()?; - - if e.prev_content.is_none() { - e.prev_content = prev_content.and_then(|e| e.deserialize().ok()); - } - - Ok(e) -} - -/// Transform an `AnySyncRoomEvent` by hoisting `prev_content` field from -/// `unsigned` to the top level. -/// -/// Due to a [bug in synapse][synapse-bug], `prev_content` often ends up in -/// `unsigned` contrary to the C2S spec. Some more discussion can be found -/// [here][discussion]. Until this is fixed in synapse or handled in Ruma, we -/// use this to hoist up `prev_content` to the top level. -/// -/// [synapse-bug]: -/// [discussion]: -pub fn hoist_room_event_prev_content( - event: &Raw, -) -> StdResult { - let prev_content = event - .deserialize_as::() - .map(|more_unsigned| more_unsigned.unsigned) - .map(|additional| additional.prev_content)? - .and_then(|p| p.deserialize().ok()); - - let mut ev = event.deserialize()?; - - match &mut ev { - AnySyncRoomEvent::State(AnySyncStateEvent::RoomMember(ref mut member)) - if member.prev_content.is_none() => - { - member.prev_content = prev_content; - } - _ => (), - } - - Ok(ev) -} - /// A no IO Client implementation. /// /// This Client is a state machine that receives responses and events and @@ -445,7 +353,7 @@ impl BaseClient { #[allow(unused_mut)] let mut event: SyncRoomEvent = event.into(); - match hoist_room_event_prev_content(&event.event) { + match event.event.deserialize() { Ok(e) => { #[allow(clippy::single_match)] match &e { @@ -611,7 +519,7 @@ impl BaseClient { let room_id = room_info.room_id.clone(); for raw_event in events { - let event = match hoist_and_deserialize_state_event(raw_event) { + let event = match raw_event.deserialize() { Ok(e) => e, Err(e) => { warn!( @@ -976,7 +884,7 @@ impl BaseClient { let members: Vec = response .chunk .iter() - .filter_map(|e| hoist_member_event(e).ok().and_then(|e| MemberEvent::try_from(e).ok())) + .filter_map(|e| e.deserialize().ok().and_then(|e| MemberEvent::try_from(e).ok())) .collect(); let mut ambiguity_cache = AmbiguityCache::new(self.store.clone()); diff --git a/matrix_sdk_base/src/lib.rs b/matrix_sdk_base/src/lib.rs index 051f2181..0b3c9991 100644 --- a/matrix_sdk_base/src/lib.rs +++ b/matrix_sdk_base/src/lib.rs @@ -15,7 +15,7 @@ //! This crate implements a [Matrix](https://matrix.org/) client library. //! -//! ## Crate Feature Flags +//! ## Crate Feature Flags //! //! The following crate feature flags are available: //! @@ -50,9 +50,7 @@ mod rooms; mod session; mod store; -pub use client::{ - hoist_and_deserialize_state_event, hoist_room_event_prev_content, BaseClient, BaseClientConfig, -}; +pub use client::{BaseClient, BaseClientConfig}; #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub use matrix_sdk_crypto as crypto; diff --git a/matrix_sdk_base/src/store/memory_store.rs b/matrix_sdk_base/src/store/memory_store.rs index 311e6373..e2e7e303 100644 --- a/matrix_sdk_base/src/store/memory_store.rs +++ b/matrix_sdk_base/src/store/memory_store.rs @@ -72,23 +72,23 @@ impl MemoryStore { #[allow(dead_code)] pub fn new() -> Self { Self { - sync_token: Arc::new(RwLock::new(None)), - filters: DashMap::new().into(), - account_data: DashMap::new().into(), - members: DashMap::new().into(), - profiles: DashMap::new().into(), - display_names: DashMap::new().into(), - joined_user_ids: DashMap::new().into(), - invited_user_ids: DashMap::new().into(), - room_info: DashMap::new().into(), - room_state: DashMap::new().into(), - room_account_data: DashMap::new().into(), - stripped_room_info: DashMap::new().into(), - stripped_room_state: DashMap::new().into(), - stripped_members: DashMap::new().into(), - presence: DashMap::new().into(), - room_user_receipts: DashMap::new().into(), - room_event_receipts: DashMap::new().into(), + sync_token: Default::default(), + filters: Default::default(), + account_data: Default::default(), + members: Default::default(), + profiles: Default::default(), + display_names: Default::default(), + joined_user_ids: Default::default(), + invited_user_ids: Default::default(), + room_info: Default::default(), + room_state: Default::default(), + room_account_data: Default::default(), + stripped_room_info: Default::default(), + stripped_room_state: Default::default(), + stripped_members: Default::default(), + presence: Default::default(), + room_user_receipts: Default::default(), + room_event_receipts: Default::default(), media: Arc::new(Mutex::new(LruCache::new(100))), } } diff --git a/matrix_sdk_base/src/store/mod.rs b/matrix_sdk_base/src/store/mod.rs index 17c77c1a..8d930722 100644 --- a/matrix_sdk_base/src/store/mod.rs +++ b/matrix_sdk_base/src/store/mod.rs @@ -310,15 +310,12 @@ pub struct Store { impl Store { fn new(inner: Box) -> Self { - let session = Arc::new(RwLock::new(None)); - let sync_token = Arc::new(RwLock::new(None)); - Self { inner: inner.into(), - session, - sync_token, - rooms: DashMap::new().into(), - stripped_rooms: DashMap::new().into(), + session: Default::default(), + sync_token: Default::default(), + rooms: Default::default(), + stripped_rooms: Default::default(), } } diff --git a/matrix_sdk_crypto/src/gossiping/machine.rs b/matrix_sdk_crypto/src/gossiping/machine.rs index 818b7959..fb334694 100644 --- a/matrix_sdk_crypto/src/gossiping/machine.rs +++ b/matrix_sdk_crypto/src/gossiping/machine.rs @@ -76,8 +76,8 @@ impl GossipMachine { device_id, store, outbound_group_sessions, - outgoing_requests: DashMap::new().into(), - incoming_key_requests: DashMap::new().into(), + outgoing_requests: Default::default(), + incoming_key_requests: Default::default(), wait_queue: WaitQueue::new(), users_for_key_claim, } diff --git a/matrix_sdk_crypto/src/session_manager/group_sessions.rs b/matrix_sdk_crypto/src/session_manager/group_sessions.rs index 79b1af74..c5a21e59 100644 --- a/matrix_sdk_crypto/src/session_manager/group_sessions.rs +++ b/matrix_sdk_crypto/src/session_manager/group_sessions.rs @@ -49,11 +49,7 @@ pub(crate) struct GroupSessionCache { impl GroupSessionCache { pub(crate) fn new(store: Store) -> Self { - Self { - store, - sessions: DashMap::new().into(), - sessions_being_shared: Arc::new(DashMap::new()), - } + Self { store, sessions: Default::default(), sessions_being_shared: Default::default() } } pub(crate) fn insert(&self, session: OutboundGroupSession) { diff --git a/matrix_sdk_crypto/src/session_manager/sessions.rs b/matrix_sdk_crypto/src/session_manager/sessions.rs index ab0af01e..da11a9eb 100644 --- a/matrix_sdk_crypto/src/session_manager/sessions.rs +++ b/matrix_sdk_crypto/src/session_manager/sessions.rs @@ -64,8 +64,8 @@ impl SessionManager { store, key_request_machine, users_for_key_claim, - wedged_devices: Arc::new(DashMap::new()), - outgoing_to_device_requests: Arc::new(DashMap::new()), + wedged_devices: Default::default(), + outgoing_to_device_requests: Default::default(), } } diff --git a/matrix_sdk_crypto/src/store/caches.rs b/matrix_sdk_crypto/src/store/caches.rs index abdd972c..35f30a56 100644 --- a/matrix_sdk_crypto/src/store/caches.rs +++ b/matrix_sdk_crypto/src/store/caches.rs @@ -37,7 +37,7 @@ pub struct SessionStore { impl SessionStore { /// Create a new empty Session store. pub fn new() -> Self { - SessionStore { entries: Arc::new(DashMap::new()) } + Self::default() } /// Add a session to the store. @@ -82,7 +82,7 @@ pub struct GroupSessionStore { impl GroupSessionStore { /// Create a new empty store. pub fn new() -> Self { - GroupSessionStore { entries: Arc::new(DashMap::new()) } + Self::default() } /// Add an inbound group session to the store. @@ -141,7 +141,7 @@ pub struct DeviceStore { impl DeviceStore { /// Create a new empty device store. pub fn new() -> Self { - DeviceStore { entries: Arc::new(DashMap::new()) } + Self::default() } /// Add a device to the store. diff --git a/matrix_sdk_crypto/src/store/memorystore.rs b/matrix_sdk_crypto/src/store/memorystore.rs index 22eeb74a..e1dc0672 100644 --- a/matrix_sdk_crypto/src/store/memorystore.rs +++ b/matrix_sdk_crypto/src/store/memorystore.rs @@ -59,13 +59,13 @@ impl Default for MemoryStore { MemoryStore { sessions: SessionStore::new(), inbound_group_sessions: GroupSessionStore::new(), - tracked_users: Arc::new(DashSet::new()), - users_for_key_query: Arc::new(DashSet::new()), - olm_hashes: Arc::new(DashMap::new()), + tracked_users: Default::default(), + users_for_key_query: Default::default(), + olm_hashes: Default::default(), devices: DeviceStore::new(), - identities: Arc::new(DashMap::new()), - outgoing_key_requests: Arc::new(DashMap::new()), - key_requests_by_info: Arc::new(DashMap::new()), + identities: Default::default(), + outgoing_key_requests: Default::default(), + key_requests_by_info: Default::default(), } } } diff --git a/matrix_sdk_crypto/src/verification/cache.rs b/matrix_sdk_crypto/src/verification/cache.rs index 2124969b..24868817 100644 --- a/matrix_sdk_crypto/src/verification/cache.rs +++ b/matrix_sdk_crypto/src/verification/cache.rs @@ -33,7 +33,7 @@ pub struct VerificationCache { impl VerificationCache { pub fn new() -> Self { - Self { verification: DashMap::new().into(), outgoing_requests: DashMap::new().into() } + Self { verification: Default::default(), outgoing_requests: Default::default() } } #[cfg(test)] diff --git a/matrix_sdk_crypto/src/verification/machine.rs b/matrix_sdk_crypto/src/verification/machine.rs index d3cb603b..1624a9e0 100644 --- a/matrix_sdk_crypto/src/verification/machine.rs +++ b/matrix_sdk_crypto/src/verification/machine.rs @@ -62,7 +62,7 @@ impl VerificationMachine { private_identity: identity, store: VerificationStore { account, inner: store }, verifications: VerificationCache::new(), - requests: DashMap::new().into(), + requests: Default::default(), } }