diff --git a/design.md b/design.md index 2686f513..fa084fa9 100644 --- a/design.md +++ b/design.md @@ -11,15 +11,15 @@ The highest level structure that ties the other pieces of functionality together - make raw Http requests #### Base Client/Client State Machine -In addition to Http the `AsyncClient` passes along methods from the `BaseClient` that deal with `Room`s and `RoomMember`s. This allows the client to keep track of more complicated information that needs to be calculated in some way. - - human readable room names +In addition to Http, the `AsyncClient` passes along methods from the `BaseClient` that deal with `Room`s and `RoomMember`s. This allows the client to keep track of more complicated information that needs to be calculated in some way. + - human-readable room names - power level? - ignored list? - push rulesset? - more? #### Crypto State Machine -Given a Matrix response the crypto machine will update it's internal state, along with encryption information this means keeping track of when to encrypt. It has knowledge of when encryption needs to happen and can be asked from the `BaseClient`. The crypto state machine is given responses that relate to encryption and can create encrypted request bodies for encryption related requests. Basically it tells the `BaseClient` to send a to-device messages out and the `BaseClient` is responsible for notifying the crypto state machine when it sent the message so crypto can update state. +Given a Matrix response the crypto machine will update its own internal state, along with encryption information. `BaseClient` and the crypto machine together keep track of when to encrypt. It knows when encryption needs to happen based on signals from the `BaseClient`. The crypto state machine is given responses that relate to encryption and can create encrypted request bodies for encryption-related requests. Basically it tells the `BaseClient` to send to-device messages out, and the `BaseClient` is responsible for notifying the crypto state machine when it sent the message so crypto can update state. #### Client State/Room and RoomMember The `BaseClient` is responsible for keeping state in sync through the `IncomingResponse`s of `AsyncClient` or querying the `StateStore`. By processing and then delegating incoming `RoomEvent`s, `StateEvent`s, `PresenceEvent`, `IncomingAccountData` and `EphemeralEvent`s to the correct `Room` in the base clients `HashMap` or further to `Room`'s `RoomMember` via the members `HashMap`. The `BaseClient` is also responsible for emitting the incoming events to the `EventEmitter` trait. @@ -87,11 +87,14 @@ pub struct RoomMember { ``` #### State Store -The `BaseClient` also has access to a `dyn StateStore` this is an abstraction around a "database" to keep client state without requesting a full sync from the server on start up. A default implementation that serializes/deserializes json to files in a specified directory can be used. The user can also implement `StateStore` to fit any storage solution they choose. - - load - - store/save +The `BaseClient` also has access to a `dyn StateStore` this is an abstraction around a "database" to keep the client state without requesting a full sync from the server on startup. A default implementation that serializes/deserializes JSON to files in a specified directory can be used. The user can also implement `StateStore` to fit any storage solution they choose. The base client handles the storage automatically. There "may be/are TODO" ways for the user to interact directly. The room event handling methods signal if the state was modified; if so, we check if some room state file needs to be overwritten. + - open + - load client/rooms + - store client/room - update ?? +The state store will restore our client state in the `BaseClient` and client authors can just get the latest state that they want to present from the client object. No need to ask the state store for it, this may change if custom setups request this. `StateStore`'s main purpose is to provide load/store functionality and, internally to the crate, update the `BaseClient`. + #### Event Emitter -The consumer of this crate can implement the `EventEmitter` trait for full control over how incoming events are handled by their client. If that isn't enough it is possible to receive every incoming response with the `AsyncClient::sync_forever` callback. +The consumer of this crate can implement the `EventEmitter` trait for full control over how incoming events are handled by their client. If that isn't enough, it is possible to receive every incoming response with the `AsyncClient::sync_forever` callback. - list the methods for `EventEmitter`? diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index 4bd7b2c2..53587745 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -16,11 +16,14 @@ encryption = ["matrix-sdk-crypto"] sqlite-cryptostore = ["matrix-sdk-crypto/sqlite-cryptostore"] [dependencies] +dirs = "2.0.2" futures = "0.3.4" reqwest = "0.10.4" http = "0.2.1" url = "2.1.1" async-trait = "0.1.30" +serde = "1.0.106" +serde_json = "1.0.51" matrix-sdk-types = { path = "../matrix_sdk_types" } matrix-sdk-crypto = { path = "../matrix_sdk_crypto", optional = true } @@ -47,7 +50,7 @@ features = ["std", "std-future"] [dependencies.tokio] version = "0.2.16" default-features = false -features = ["sync", "time"] +features = ["sync", "time", "fs"] [dependencies.sqlx] version = "0.3.3" @@ -62,3 +65,4 @@ serde_json = "1.0.51" tracing-subscriber = "0.2.4" tempfile = "3.1.0" mockito = "0.25.1" +lazy_static = "1.4.0" diff --git a/matrix_sdk/examples/command_bot.rs b/matrix_sdk/examples/command_bot.rs index fded958e..a238dac5 100644 --- a/matrix_sdk/examples/command_bot.rs +++ b/matrix_sdk/examples/command_bot.rs @@ -4,7 +4,7 @@ use std::{env, process::exit}; use matrix_sdk::{ self, events::room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, - AsyncClient, AsyncClientConfig, EventEmitter, Room, SyncSettings, + AsyncClient, AsyncClientConfig, EventEmitter, JsonStore, Room, SyncSettings, }; use tokio::sync::RwLock; use url::Url; @@ -63,9 +63,15 @@ async fn login_and_sync( username: String, password: String, ) -> Result<(), matrix_sdk::Error> { + // the location for `JsonStore` to save files to + let mut home = dirs::home_dir().expect("no home directory found"); + home.push("party_bot"); + + let store = JsonStore::open(&home)?; let client_config = AsyncClientConfig::new() .proxy("http://localhost:8080")? - .disable_ssl_verification(); + .disable_ssl_verification() + .state_store(Box::new(store)); let homeserver_url = Url::parse(&homeserver_url)?; // create a new AsyncClient with the given homeserver url and config @@ -82,7 +88,9 @@ async fn login_and_sync( println!("logged in as {}", username); - // initial sync to set up state and so our bot doesn't respond to old messages + // An initial sync to set up state and so our bot doesn't respond to old messages. + // If the `StateStore` finds saved state in the location given the initial sync will + // be skipped in favor of loading state from the store client.sync(SyncSettings::default()).await.unwrap(); // add our CommandBot to be notified of incoming messages, we do this after the initial // sync to avoid responding to messages before the bot was running. diff --git a/matrix_sdk/src/async_client.rs b/matrix_sdk/src/async_client.rs index 0c8d0244..972be28a 100644 --- a/matrix_sdk/src/async_client.rs +++ b/matrix_sdk/src/async_client.rs @@ -17,6 +17,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; +use std::ops::Deref; use std::result::Result as StdResult; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -48,6 +49,7 @@ use crate::api; use crate::base_client::Client as BaseClient; use crate::models::Room; use crate::session::Session; +use crate::state::{ClientState, StateStore}; use crate::VERSION; use crate::{Error, EventEmitter, Result}; @@ -72,9 +74,12 @@ impl std::fmt::Debug for AsyncClient { } } -#[derive(Default, Debug)] +#[derive(Default)] /// Configuration for the creation of the `AsyncClient`. /// +/// When setting the `StateStore` it is up to the user to open/connect +/// the storage backend before client creation. +/// /// # Example /// /// ``` @@ -86,10 +91,29 @@ impl std::fmt::Debug for AsyncClient { /// .unwrap() /// .disable_ssl_verification(); /// ``` +/// An example of adding a default `JsonStore` to the `AsyncClient`. +/// ```no_run +/// # use matrix_sdk::{AsyncClientConfig, JsonStore}; +/// +/// let store = JsonStore::open("path/to/json").unwrap(); +/// let client_config = AsyncClientConfig::new() +/// .state_store(Box::new(store)); +/// ``` pub struct AsyncClientConfig { proxy: Option, user_agent: Option, disable_ssl_verification: bool, + state_store: Option>, +} + +impl std::fmt::Debug for AsyncClientConfig { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> StdResult<(), std::fmt::Error> { + fmt.debug_struct("AsyncClientConfig") + .field("proxy", &self.proxy) + .field("user_agent", &self.user_agent) + .field("disable_ssl_verification", &self.disable_ssl_verification) + .finish() + } } impl AsyncClientConfig { @@ -131,6 +155,14 @@ impl AsyncClientConfig { self.user_agent = Some(HeaderValue::from_str(user_agent)?); Ok(self) } + + /// Set a custom implementation of a `StateStore`. + /// + /// The state store should be opened before being set. + pub fn state_store(mut self, store: Box) -> Self { + self.state_store = Some(store); + self + } } #[derive(Debug, Default, Clone)] @@ -255,10 +287,16 @@ impl AsyncClient { let http_client = http_client.default_headers(headers).build()?; + let mut base_client = BaseClient::new(session)?; + + if let Some(store) = config.state_store { + base_client.state_store = Some(store); + }; + Ok(Self { homeserver, http_client, - base_client: Arc::new(RwLock::new(BaseClient::new(session)?)), + base_client: Arc::new(RwLock::new(base_client)), }) } @@ -306,6 +344,33 @@ impl AsyncClient { self.base_client.read().await.joined_rooms.clone() } + /// This allows `AsyncClient` to manually sync state with the provided `StateStore`. + /// + /// Returns true when a successful `StateStore` sync has completed. + /// # Examples + /// + /// ```no_run + /// use matrix_sdk::{AsyncClient, AsyncClientConfig, JsonStore, RoomBuilder}; + /// # use matrix_sdk::api::r0::room::Visibility; + /// # use url::Url; + /// + /// # let homeserver = Url::parse("http://example.com").unwrap(); + /// let store = JsonStore::open("path/to/store").unwrap(); + /// let config = AsyncClientConfig::new().state_store(Box::new(store)); + /// let mut cli = AsyncClient::new(homeserver, None).unwrap(); + /// # use futures::executor::block_on; + /// # block_on(async { + /// let _ = cli.login("name", "password", None, None).await.unwrap(); + /// // returns true when a state store sync is successful + /// assert!(cli.sync_with_state_store().await.unwrap()); + /// // now state is restored without a request to the server + /// assert_eq!(vec!["room".to_string(), "names".to_string()], cli.get_room_names().await) + /// # }); + /// ``` + pub async fn sync_with_state_store(&self) -> Result { + self.base_client.write().await.sync_with_state_store().await + } + /// Login to the server. /// /// # Arguments @@ -339,6 +404,7 @@ impl AsyncClient { let response = self.send(request).await?; let mut client = self.base_client.write().await; + client.receive_login_response(&response).await?; Ok(response) @@ -549,11 +615,27 @@ impl AsyncClient { /// Synchronize the client's state with the latest state on the server. /// + /// If a `StateStore` is provided and this is the initial sync state will + /// be loaded from the state store. + /// /// # Arguments /// /// * `sync_settings` - Settings for the sync call. #[instrument] - pub async fn sync(&self, sync_settings: SyncSettings) -> Result { + pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result { + { + // if the client has been synced from the state store don't sync again + if !self.base_client.read().await.is_state_store_synced() { + // this will bail out returning false if the store has not been set up + if let Ok(synced) = self.sync_with_state_store().await { + if synced { + // once synced, update the sync token to the last known state from `StateStore`. + sync_settings.token = self.sync_token().await; + } + } + } + } + let request = sync_events::Request { filter: None, since: sync_settings.token, @@ -564,12 +646,15 @@ impl AsyncClient { let mut response = self.send(request).await?; + let mut updated = false; for (room_id, room) in &mut response.rooms.join { let matrix_room = { let mut client = self.base_client.write().await; for event in &room.state.events { if let Ok(e) = event.deserialize() { - client.receive_joined_state_event(&room_id, &e).await; + if client.receive_joined_state_event(&room_id, &e).await { + updated = true; + } } } @@ -590,9 +675,14 @@ impl AsyncClient { for mut event in &mut room.timeline.events { let decrypted_event = { let mut client = self.base_client.write().await; - client - .receive_joined_timeline_event(room_id, &mut event) - .await + let mut timeline_update = false; + let decrypt_ev = client + .receive_joined_timeline_event(room_id, &mut event, &mut timeline_update) + .await; + if timeline_update { + updated = true; + }; + decrypt_ev }; if let Some(e) = decrypted_event { @@ -610,8 +700,10 @@ impl AsyncClient { { if let Ok(e) = account_data.deserialize() { let mut client = self.base_client.write().await; - client.receive_account_data_event(&room_id, &e).await; - client.emit_account_data_event(&room_id, &e).await; + if client.receive_account_data_event(&room_id, &e).await { + updated = true; + } + client.emit_account_data_event(room_id, &e).await; } } } @@ -623,7 +715,9 @@ impl AsyncClient { { if let Ok(e) = presence.deserialize() { let mut client = self.base_client.write().await; - client.receive_presence_event(&room_id, &e).await; + if client.receive_presence_event(&room_id, &e).await { + updated = true; + } client.emit_presence_event(&room_id, &e).await; } @@ -634,17 +728,33 @@ impl AsyncClient { { if let Ok(e) = ephemeral.deserialize() { let mut client = self.base_client.write().await; - client.receive_ephemeral_event(&room_id, &e).await; + if client.receive_ephemeral_event(&room_id, &e).await { + updated = true; + } client.emit_ephemeral_event(&room_id, &e).await; } } } + + if updated { + if let Some(store) = self.base_client.read().await.state_store.as_ref() { + store + .store_room_state(matrix_room.read().await.deref()) + .await?; + } + } } let mut client = self.base_client.write().await; client.receive_sync_response(&mut response).await; + if updated { + if let Some(store) = client.state_store.as_ref() { + let state = ClientState::from_base_client(&client); + store.store_client_state(state).await?; + } + } Ok(response) } diff --git a/matrix_sdk/src/base_client.rs b/matrix_sdk/src/base_client.rs index 2d5ff58d..a7d1d3d6 100644 --- a/matrix_sdk/src/base_client.rs +++ b/matrix_sdk/src/base_client.rs @@ -34,6 +34,7 @@ use crate::events::EventJson; use crate::identifiers::{RoomId, UserId}; use crate::models::Room; use crate::session::Session; +use crate::state::{ClientState, StateStore}; use crate::EventEmitter; #[cfg(feature = "encryption")] @@ -75,6 +76,10 @@ pub struct Client { /// Any implementor of EventEmitter will act as the callbacks for various /// events. pub event_emitter: Option>, + /// + pub state_store: Option>, + /// Does the `Client` need to sync with the state store. + needs_state_store_sync: bool, #[cfg(feature = "encryption")] olm: Arc>>, @@ -114,6 +119,8 @@ impl Client { ignored_users: Vec::new(), push_ruleset: None, event_emitter: None, + state_store: None, + needs_state_store_sync: true, #[cfg(feature = "encryption")] olm: Arc::new(Mutex::new(olm)), }) @@ -131,6 +138,54 @@ impl Client { self.event_emitter = Some(emitter); } + /// Returns true if the state store has been loaded into the client. + pub fn is_state_store_synced(&self) -> bool { + !self.needs_state_store_sync + } + + /// When a client is provided the state store will load state from the `StateStore`. + /// + /// Returns `true` when a sync has successfully completed. + pub(crate) async fn sync_with_state_store(&mut self) -> Result { + if let Some(store) = self.state_store.as_ref() { + if let Some(client_state) = store.load_client_state().await? { + let ClientState { + user_id, + device_id, + sync_token, + ignored_users, + push_ruleset, + } = client_state; + + if let Some(sess) = self.session.as_mut() { + if let Some(device) = device_id { + sess.device_id = device; + } + if let Some(user) = user_id { + sess.user_id = user; + } + } + self.sync_token = sync_token; + self.ignored_users = ignored_users; + self.push_ruleset = push_ruleset; + } else { + // return false and continues with a sync request then save the state and create + // and populate the files during the sync + return Ok(false); + } + + let mut rooms = store.load_all_rooms().await?; + self.joined_rooms = rooms + .drain() + .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) + .collect(); + + self.needs_state_store_sync = false; + } + + Ok(!self.needs_state_store_sync) + } + /// Receive a login response and update the session of the client. /// /// # Arguments @@ -231,10 +286,14 @@ impl Client { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. + /// + /// * `did_update` - This is used internally to confirm when the state has + /// been updated. pub async fn receive_joined_timeline_event( &mut self, room_id: &RoomId, event: &mut EventJson, + did_update: &mut bool, ) -> Option> { match event.deserialize() { #[allow(unused_mut)] @@ -260,7 +319,8 @@ impl Client { } let mut room = self.get_or_create_room(&room_id).write().await; - room.receive_timeline_event(&e); + // TODO is passing in the bool to use in `AsyncClient::sync` ok here + *did_update = room.receive_timeline_event(&e); decrypted_event } _ => None, diff --git a/matrix_sdk/src/error.rs b/matrix_sdk/src/error.rs index ecbfaa5b..56682882 100644 --- a/matrix_sdk/src/error.rs +++ b/matrix_sdk/src/error.rs @@ -15,10 +15,13 @@ //! Error conditions. +use std::io::Error as IoError; + use reqwest::Error as ReqwestError; use ruma_api::error::FromHttpResponseError as RumaResponseError; use ruma_api::error::IntoHttpError as RumaIntoHttpError; use ruma_client_api::Error as RumaClientError; +use serde_json::Error as JsonError; use thiserror::Error; use url::ParseError; @@ -46,6 +49,12 @@ pub enum Error { /// An error converting between ruma_client_api types and Hyper types. #[error("can't convert between ruma_client_api and hyper types.")] IntoHttp(RumaIntoHttpError), + /// An error de/serializing type for the `StateStore` + #[error(transparent)] + SerdeJson(#[from] JsonError), + /// An error de/serializing type for the `StateStore` + #[error(transparent)] + IoError(#[from] IoError), #[cfg(feature = "encryption")] /// An error occurred during a E2EE operation. #[error(transparent)] diff --git a/matrix_sdk/src/lib.rs b/matrix_sdk/src/lib.rs index d3ec1178..fcf2b61f 100644 --- a/matrix_sdk/src/lib.rs +++ b/matrix_sdk/src/lib.rs @@ -40,6 +40,7 @@ mod event_emitter; mod models; mod request_builder; mod session; +mod state; #[cfg(test)] pub mod test_builder; @@ -51,5 +52,6 @@ pub use event_emitter::EventEmitter; pub use matrix_sdk_crypto::{Device, TrustState}; pub use models::Room; pub use request_builder::{MessagesRequestBuilder, RoomBuilder}; +pub use state::{JsonStore, StateStore}; pub(crate) const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/matrix_sdk/src/models/event_deser.rs b/matrix_sdk/src/models/event_deser.rs new file mode 100644 index 00000000..f8e7ccec --- /dev/null +++ b/matrix_sdk/src/models/event_deser.rs @@ -0,0 +1,56 @@ +//! De-/serialization functions to and from json strings, allows the type to be used as a query string. + +use serde::de::{Deserialize, Deserializer, Error as _}; + +use crate::events::collections::all::Event; +use crate::events::presence::PresenceEvent; +use crate::events::EventJson; + +pub fn deserialize_events<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let mut events = vec![]; + let ev = Vec::>::deserialize(deserializer)?; + for event in ev { + events.push(event.deserialize().map_err(D::Error::custom)?); + } + + Ok(events) +} + +pub fn deserialize_presence<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let mut events = vec![]; + let ev = Vec::>::deserialize(deserializer)?; + for event in ev { + events.push(event.deserialize().map_err(D::Error::custom)?); + } + + Ok(events) +} + +#[cfg(test)] +mod test { + use std::fs; + + use crate::events::room::member::MemberEvent; + use crate::events::EventJson; + use crate::models::RoomMember; + + #[test] + fn events_and_presence_deserialization() { + let ev_json = fs::read_to_string("../test_data/events/member.json").unwrap(); + let ev = serde_json::from_str::>(&ev_json) + .unwrap() + .deserialize() + .unwrap(); + let member = RoomMember::new(&ev); + + let member_json = serde_json::to_string(&member).unwrap(); + let mem = serde_json::from_str::(&member_json).unwrap(); + assert_eq!(member, mem); + } +} diff --git a/matrix_sdk/src/models/mod.rs b/matrix_sdk/src/models/mod.rs index 07fcb00e..5e461a32 100644 --- a/matrix_sdk/src/models/mod.rs +++ b/matrix_sdk/src/models/mod.rs @@ -1,3 +1,4 @@ +mod event_deser; mod room; mod room_member; diff --git a/matrix_sdk/src/models/room.rs b/matrix_sdk/src/models/room.rs index 64c9f442..9659e11b 100644 --- a/matrix_sdk/src/models/room.rs +++ b/matrix_sdk/src/models/room.rs @@ -34,8 +34,8 @@ use crate::events::EventType; use crate::identifiers::{RoomAliasId, RoomId, UserId}; use js_int::{Int, UInt}; - -#[derive(Debug, Default, PartialEq)] +use serde::{Deserialize, Serialize}; +#[derive(Debug, Default, PartialEq, Serialize, Deserialize)] /// `RoomName` allows the calculation of a text room name. pub struct RoomName { /// The displayed name of the room. @@ -57,7 +57,7 @@ pub struct RoomName { pub invited_member_count: Option, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct PowerLevels { /// The level required to ban a user. pub ban: Int, @@ -83,7 +83,7 @@ pub struct PowerLevels { pub notifications: Int, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Tombstone { /// A server-defined message. body: String, @@ -91,7 +91,7 @@ pub struct Tombstone { replacement: RoomId, } -#[derive(Debug)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] /// A Matrix room. pub struct Room { /// The unique id of the room. diff --git a/matrix_sdk/src/models/room_member.rs b/matrix_sdk/src/models/room_member.rs index f9275728..183a61a6 100644 --- a/matrix_sdk/src/models/room_member.rs +++ b/matrix_sdk/src/models/room_member.rs @@ -24,10 +24,10 @@ use crate::events::room::{ use crate::identifiers::UserId; use js_int::{Int, UInt}; - +use serde::{Deserialize, Serialize}; // Notes: if Alice invites Bob into a room we will get an event with the sender as Alice and the state key as Bob. -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] /// A Matrix room member. /// pub struct RoomMember { @@ -58,11 +58,26 @@ pub struct RoomMember { /// The human readable name of this room member. pub name: String, /// The events that created the state of this room member. + #[serde(deserialize_with = "super::event_deser::deserialize_events")] pub events: Vec, /// The `PresenceEvent`s connected to this user. + #[serde(deserialize_with = "super::event_deser::deserialize_presence")] pub presence_events: Vec, } +impl PartialEq for RoomMember { + fn eq(&self, other: &RoomMember) -> bool { + // TODO check everything but events and presence_events they don't impl PartialEq + self.room_id == other.room_id + && self.user_id == other.user_id + && self.name == other.name + && self.display_name == other.display_name + && self.avatar_url == other.avatar_url + && self.last_active_ago == other.last_active_ago + && self.membership == other.membership + } +} + impl RoomMember { pub fn new(event: &MemberEvent) -> Self { Self { diff --git a/matrix_sdk/src/request_builder.rs b/matrix_sdk/src/request_builder.rs index 9f1db9df..a17bcf86 100644 --- a/matrix_sdk/src/request_builder.rs +++ b/matrix_sdk/src/request_builder.rs @@ -295,6 +295,7 @@ mod test { use super::*; use crate::events::room::power_levels::NotificationPowerLevels; use crate::{identifiers::RoomId, AsyncClient, Session}; + use api::r0::filter::{LazyLoadOptions, RoomEventFilter}; use js_int::Int; use mockito::{mock, Matcher}; @@ -370,7 +371,12 @@ mod test { .to("t4357353_219380_26003_2265".to_string()) .direction(Direction::Backward) .limit(UInt::new(10).unwrap()) - .filter(RoomEventFilter::default()); + .filter(RoomEventFilter { + lazy_load_options: LazyLoadOptions::Enabled { + include_redundant_members: false, + }, + ..Default::default() + }); let cli = AsyncClient::new(homeserver, Some(session)).unwrap(); assert!(cli.room_messages(builder).await.is_ok()); diff --git a/matrix_sdk/src/session.rs b/matrix_sdk/src/session.rs index 6df86eff..f091d16a 100644 --- a/matrix_sdk/src/session.rs +++ b/matrix_sdk/src/session.rs @@ -16,7 +16,6 @@ //! User sessions. use ruma_identifiers::UserId; - /// A user session, containing an access token and information about the associated user account. #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct Session { diff --git a/matrix_sdk/src/state/mod.rs b/matrix_sdk/src/state/mod.rs new file mode 100644 index 00000000..723bdb07 --- /dev/null +++ b/matrix_sdk/src/state/mod.rs @@ -0,0 +1,165 @@ +// Copyright 2020 Damir Jelić +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +pub mod state_store; +pub use state_store::JsonStore; + +use serde::{Deserialize, Serialize}; + +use crate::base_client::{Client as BaseClient, Token}; +use crate::events::push_rules::Ruleset; +use crate::identifiers::{DeviceId, RoomId, UserId}; +use crate::models::Room; +use crate::Result; + +/// `ClientState` holds all the information to restore a `BaseClient` +/// except the `access_token` as the default store is not secure. +/// +/// When implementing `StateStore` for something other than the filesystem +/// implement `From for YourDbType` this allows for easy conversion +/// when needed in `StateStore::load/store_client_state` +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct ClientState { + /// The `UserId` for the current logged in user. + pub user_id: Option, + /// The `DeviceId` of the current logged in user. + pub device_id: Option, + /// The current sync token that should be used for the next sync call. + pub sync_token: Option, + /// A list of ignored users. + pub ignored_users: Vec, + /// The push ruleset for the logged in user. + pub push_ruleset: Option, +} + +impl ClientState { + pub fn from_base_client(client: &BaseClient) -> ClientState { + let BaseClient { + session, + sync_token, + ignored_users, + push_ruleset, + .. + } = client; + Self { + user_id: session.as_ref().map(|s| s.user_id.clone()), + device_id: session.as_ref().map(|s| s.device_id.clone()), + sync_token: sync_token.clone(), + ignored_users: ignored_users.clone(), + push_ruleset: push_ruleset.clone(), + } + } +} + +/// Abstraction around the data store to avoid unnecessary request on client initialization. +#[async_trait::async_trait] +pub trait StateStore: Send + Sync { + /// Loads the state of `BaseClient` through `ClientState` type. + /// + /// An `Option::None` should be returned only if the `StateStore` tries to + /// load but no state has been stored. + async fn load_client_state(&self) -> Result>; + /// Load the state of all `Room`s. + /// + /// This will be mapped over in the client in order to store `Room`s in an async safe way. + async fn load_all_rooms(&self) -> Result>; + /// Save the current state of the `BaseClient` using the `StateStore::Store` type. + async fn store_client_state(&self, _: ClientState) -> Result<()>; + /// Save the state a single `Room`. + async fn store_room_state(&self, _: &Room) -> Result<()>; +} + +#[cfg(test)] +mod test { + use super::*; + + use std::collections::HashMap; + use std::convert::TryFrom; + + #[test] + fn serialize() { + let id = RoomId::try_from("!roomid:example.com").unwrap(); + let user = UserId::try_from("@example:example.com").unwrap(); + + let room = Room::new(&id, &user); + + let state = ClientState { + user_id: Some(user.clone()), + device_id: None, + sync_token: Some("hello".into()), + ignored_users: vec![user], + push_ruleset: None, + }; + assert_eq!( + r#"{"user_id":"@example:example.com","device_id":null,"sync_token":"hello","ignored_users":["@example:example.com"],"push_ruleset":null}"#, + serde_json::to_string(&state).unwrap() + ); + + let mut joined_rooms = HashMap::new(); + joined_rooms.insert(id, room); + assert_eq!( + r#"{ + "!roomid:example.com": { + "room_id": "!roomid:example.com", + "room_name": { + "name": null, + "canonical_alias": null, + "aliases": [], + "heroes": [], + "joined_member_count": null, + "invited_member_count": null + }, + "own_user_id": "@example:example.com", + "creator": null, + "members": {}, + "typing_users": [], + "power_levels": null, + "encrypted": false, + "unread_highlight": null, + "unread_notifications": null, + "tombstone": null + } +}"#, + serde_json::to_string_pretty(&joined_rooms).unwrap() + ); + } + + #[test] + fn deserialize() { + let id = RoomId::try_from("!roomid:example.com").unwrap(); + let user = UserId::try_from("@example:example.com").unwrap(); + + let room = Room::new(&id, &user); + + let state = ClientState { + user_id: Some(user.clone()), + device_id: None, + sync_token: Some("hello".into()), + ignored_users: vec![user], + push_ruleset: None, + }; + let json = serde_json::to_string(&state).unwrap(); + + assert_eq!(state, serde_json::from_str(&json).unwrap()); + + let mut joined_rooms = HashMap::new(); + joined_rooms.insert(id, room); + let json = serde_json::to_string(&joined_rooms).unwrap(); + + assert_eq!(joined_rooms, serde_json::from_str(&json).unwrap()); + } +} diff --git a/matrix_sdk/src/state/state_store.rs b/matrix_sdk/src/state/state_store.rs new file mode 100644 index 00000000..83761553 --- /dev/null +++ b/matrix_sdk/src/state/state_store.rs @@ -0,0 +1,305 @@ +use std::collections::HashMap; +use std::fs::{self, OpenOptions}; +use std::io::{BufReader, BufWriter, Write}; +use std::path::{Path, PathBuf}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use tokio::fs as async_fs; +use tokio::sync::RwLock; + +use super::{ClientState, StateStore}; +use crate::identifiers::RoomId; +use crate::{Error, Result, Room}; +/// A default `StateStore` implementation that serializes state as json +/// and saves it to disk. +/// +/// When logged in the `JsonStore` appends the user_id to it's folder path, +/// so all files are saved in `my_client/user_id/*`. +pub struct JsonStore { + path: Arc>, + user_path_set: AtomicBool, +} + +impl JsonStore { + /// Create a `JsonStore` to store the client and room state. + /// + /// Checks if the provided path exists and creates the directories if not. + pub fn open>(path: P) -> Result { + let p = path.as_ref(); + if !p.exists() { + fs::create_dir_all(p)?; + } + Ok(Self { + path: Arc::new(RwLock::new(p.to_path_buf())), + user_path_set: AtomicBool::new(false), + }) + } +} + +#[async_trait::async_trait] +impl StateStore for JsonStore { + async fn load_client_state(&self) -> Result> { + let mut path = self.path.read().await.clone(); + path.push("client.json"); + + let json = async_fs::read_to_string(path) + .await + .map_or(String::default(), |s| s); + if json.is_empty() { + Ok(None) + } else { + serde_json::from_str(&json).map(Some).map_err(Error::from) + } + } + + async fn load_all_rooms(&self) -> Result> { + let mut path = self.path.read().await.clone(); + path.push("rooms"); + + let mut rooms_map = HashMap::new(); + for file in fs::read_dir(&path)? { + let file = file?.path(); + + if file.is_dir() { + continue; + } + + let f_hdl = OpenOptions::new().read(true).open(&file)?; + let reader = BufReader::new(f_hdl); + + let room = serde_json::from_reader::<_, Room>(reader).map_err(Error::from)?; + let room_id = room.room_id.clone(); + + rooms_map.insert(room_id, room); + } + + Ok(rooms_map) + } + + async fn store_client_state(&self, state: ClientState) -> Result<()> { + if !self.user_path_set.load(Ordering::SeqCst) { + if let Some(user) = &state.user_id { + self.user_path_set.swap(true, Ordering::SeqCst); + self.path.write().await.push(user.localpart()) + } + } + let mut path = self.path.read().await.clone(); + path.push("client.json"); + + if !Path::new(&path).exists() { + let mut dir = path.clone(); + dir.pop(); + async_fs::create_dir_all(dir).await?; + } + + let json = serde_json::to_string(&state).map_err(Error::from)?; + + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path)?; + let mut writer = BufWriter::new(file); + writer.write_all(json.as_bytes())?; + + Ok(()) + } + + async fn store_room_state(&self, room: &Room) -> Result<()> { + if !self.user_path_set.load(Ordering::SeqCst) { + self.user_path_set.swap(true, Ordering::SeqCst); + self.path.write().await.push(room.own_user_id.localpart()) + } + + let mut path = self.path.read().await.clone(); + path.push(&format!("rooms/{}.json", room.room_id)); + + if !Path::new(&path).exists() { + let mut dir = path.clone(); + dir.pop(); + async_fs::create_dir_all(dir).await?; + } + + let json = serde_json::to_string(&room).map_err(Error::from)?; + + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path)?; + let mut writer = BufWriter::new(file); + writer.write_all(json.as_bytes())?; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::convert::TryFrom; + use std::fs; + use std::future::Future; + use std::path::PathBuf; + use std::str::FromStr; + + use lazy_static::lazy_static; + use mockito::{mock, Matcher}; + use tokio::sync::Mutex; + + use crate::identifiers::{RoomId, UserId}; + use crate::{AsyncClient, AsyncClientConfig, Session, SyncSettings}; + + lazy_static! { + /// Limit io tests to one thread at a time. + pub static ref MTX: Mutex<()> = Mutex::new(()); + } + + lazy_static! { + /// Limit io tests to one thread at a time. + pub static ref PATH: PathBuf = { + let mut path = dirs::home_dir().unwrap(); + path.push(".matrix_store"); + path + }; + } + + async fn run_and_cleanup(test: fn() -> Fut) + where + Fut: Future, + { + let _lock = MTX.lock().await; + + test().await; + + if PATH.exists() { + let path: &Path = &PATH; + fs::remove_dir_all(path).unwrap(); + } + } + + async fn test_store_client_state() { + let path: &Path = &PATH; + + let user = UserId::try_from("@example:example.com").unwrap(); + + let store = JsonStore::open(path).unwrap(); + + let state = ClientState { + user_id: Some(user.clone()), + device_id: None, + sync_token: Some("hello".into()), + ignored_users: vec![user], + push_ruleset: None, + }; + + store.store_client_state(state.clone()).await.unwrap(); + let loaded = store.load_client_state().await.unwrap(); + assert_eq!(loaded, Some(state)); + } + + #[tokio::test] + async fn store_client_state() { + run_and_cleanup(test_store_client_state).await; + } + + async fn test_store_room_state() { + let path: &Path = &PATH; + let store = JsonStore::open(path).unwrap(); + + let id = RoomId::try_from("!roomid:example.com").unwrap(); + let user = UserId::try_from("@example:example.com").unwrap(); + + let room = Room::new(&id, &user); + store.store_room_state(&room).await.unwrap(); + let loaded = store.load_all_rooms().await.unwrap(); + assert_eq!(loaded.get(&id), Some(&Room::new(&id, &user))); + } + + #[tokio::test] + async fn store_room_state() { + run_and_cleanup(test_store_room_state).await; + } + + async fn test_load_rooms() { + let path: &Path = &PATH; + let store = JsonStore::open(path).unwrap(); + + let id = RoomId::try_from("!roomid:example.com").unwrap(); + let user = UserId::try_from("@example:example.com").unwrap(); + + let room = Room::new(&id, &user); + store.store_room_state(&room).await.unwrap(); + let loaded = store.load_all_rooms().await.unwrap(); + assert_eq!(&room, loaded.get(&id).unwrap()); + } + + #[tokio::test] + async fn load_rooms() { + run_and_cleanup(test_load_rooms).await; + } + + async fn test_client_sync_store() { + let homeserver = url::Url::from_str(&mockito::server_url()).unwrap(); + + let session = Session { + access_token: "1234".to_owned(), + user_id: UserId::try_from("@cheeky_monkey:matrix.org").unwrap(), + device_id: "DEVICEID".to_owned(), + }; + + let _m = mock( + "GET", + Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), + ) + .with_status(200) + .with_body_from_file("../test_data/sync.json") + .create(); + + let _m = mock("POST", "/_matrix/client/r0/login") + .with_status(200) + .with_body_from_file("../test_data/login_response.json") + .create(); + + let path: &Path = &PATH; + // a sync response to populate our JSON store + let config = + AsyncClientConfig::default().state_store(Box::new(JsonStore::open(path).unwrap())); + let client = + AsyncClient::new_with_config(homeserver.clone(), Some(session.clone()), config) + .unwrap(); + let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); + + // gather state to save to the db, the first time through loading will be skipped + let _ = client.sync(sync_settings.clone()).await.unwrap(); + + // now syncing the client will update from the state store + let config = + AsyncClientConfig::default().state_store(Box::new(JsonStore::open(path).unwrap())); + let client = + AsyncClient::new_with_config(homeserver, Some(session.clone()), config).unwrap(); + client.sync(sync_settings).await.unwrap(); + + let base_client = client.base_client.read().await; + + // assert the synced client and the logged in client are equal + assert_eq!(base_client.session, Some(session)); + assert_eq!( + base_client.sync_token, + Some("s526_47314_0_7_1_1_1_11444_1".to_string()) + ); + assert_eq!( + base_client.ignored_users, + vec![UserId::try_from("@someone:example.org").unwrap()] + ); + } + + #[tokio::test] + async fn client_sync_store() { + run_and_cleanup(test_client_sync_store).await; + } +} diff --git a/matrix_sdk/src/test_builder.rs b/matrix_sdk/src/test_builder.rs index 75984a66..b6badf5e 100644 --- a/matrix_sdk/src/test_builder.rs +++ b/matrix_sdk/src/test_builder.rs @@ -13,9 +13,9 @@ use crate::events::{ EventJson, TryFromRaw, }; use crate::identifiers::{RoomId, UserId}; -use crate::AsyncClient; +use crate::{AsyncClient, Error, SyncSettings}; -use mockito::{self, mock, Mock}; +use mockito::{self, mock, Matcher, Mock}; use crate::models::Room; @@ -196,11 +196,7 @@ impl EventBuilder { /// /// The `TestRunner` streams the events to the client and holds methods to make assertions /// about the state of the client. - pub fn build_mock_runner>( - mut self, - method: &str, - path: P, - ) -> MockTestRunner { + pub fn build_mock_runner>(mut self, method: &str, path: P) -> MockTestRunner { let body = serde_json::json! { { "device_one_time_keys_count": {}, @@ -371,7 +367,7 @@ impl ClientTestRunner { } for event in &self.room_events { - cli.receive_joined_timeline_event(room_id, &mut EventJson::from(event)) + cli.receive_joined_timeline_event(room_id, &mut EventJson::from(event), &mut false) .await; } for event in &self.presence_events { @@ -399,11 +395,11 @@ impl MockTestRunner { self } - pub async fn to_client(&mut self) -> Result<&mut AsyncClient, crate::Error> { + pub async fn to_client(&mut self) -> Result<&mut AsyncClient, Error> { self.client .as_mut() .unwrap() - .sync(crate::SyncSettings::default()) + .sync(SyncSettings::default()) .await?; Ok(self.client.as_mut().unwrap())