diff --git a/Cargo.toml b/Cargo.toml index b5322703..ceaa9d53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ features = ["std", "std-future"] [dependencies.tokio] version = "0.2.16" default-features = false -features = ["sync", "time"] +features = ["sync", "time", "fs"] [dependencies.sqlx] version = "0.3.3" diff --git a/src/async_client.rs b/src/async_client.rs index 280768f6..36c8dfe3 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -18,7 +18,6 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::ops::Deref; -use std::path::Path; use std::result::Result as StdResult; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -159,7 +158,7 @@ impl AsyncClientConfig { /// Set a custom implementation of a `StateStore`. /// - /// The state store should be "connected/opened" before being set. + /// The state store should be opened before being set. pub fn state_store(mut self, store: Box) -> Self { self.state_store = Some(store); self @@ -610,7 +609,7 @@ impl AsyncClient { #[instrument] pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result { { - // if the client hasn't been synced from the state store don't sync again + // if the client has been synced from the state store don't sync again if !self.base_client.read().await.is_state_store_synced() { // this will bail out returning false if the store has not been set up if let Ok(synced) = self.sync_with_state_store().await { diff --git a/src/base_client.rs b/src/base_client.rs index 0b9d5f88..7f618894 100644 --- a/src/base_client.rs +++ b/src/base_client.rs @@ -148,27 +148,37 @@ impl Client { /// Returns `true` when a sync has successfully completed. pub(crate) async fn sync_with_state_store(&mut self) -> Result { if let Some(store) = self.state_store.as_ref() { - // return false and continues with a sync request then saves the state and creates - // and populates the files during the sync - if !store.initial_use().await? { + if let Some(client_state) = store.load_client_state().await? { + let ClientState { + user_id, + device_id, + sync_token, + ignored_users, + push_ruleset, + } = client_state; + + if let Some(sess) = self.session.as_mut() { + if let Some(device) = device_id { + sess.device_id = device; + } + if let Some(user) = user_id { + sess.user_id = user; + } + } + self.sync_token = sync_token; + self.ignored_users = ignored_users; + self.push_ruleset = push_ruleset; + } else { + // return false and continues with a sync request then save the state and create + // and populate the files during the sync return Ok(false); } - let ClientState { - session, - sync_token, - ignored_users, - push_ruleset, - } = store.load_client_state().await?; - let mut rooms = store.load_all_rooms().await?; + let mut rooms = store.load_all_rooms().await?; self.joined_rooms = rooms .drain() .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) .collect(); - self.session = session; - self.sync_token = sync_token; - self.ignored_users = ignored_users; - self.push_ruleset = push_ruleset; self.needs_state_store_sync = false; } diff --git a/src/request_builder.rs b/src/request_builder.rs index f9d58d14..a9893321 100644 --- a/src/request_builder.rs +++ b/src/request_builder.rs @@ -295,6 +295,7 @@ mod test { use super::*; use crate::events::room::power_levels::NotificationPowerLevels; use crate::{identifiers::RoomId, AsyncClient, Session}; + use api::r0::filter::{LazyLoadOptions, RoomEventFilter}; use js_int::Int; use mockito::{mock, Matcher}; @@ -371,9 +372,13 @@ mod test { .from("t47429-4392820_219380_26003_2265".to_string()) .to("t4357353_219380_26003_2265".to_string()) .direction(Direction::Backward) - .limit(UInt::new(10).unwrap()); - // TODO this makes ruma error `Err(IntoHttp(IntoHttpError(Query(Custom("unsupported value")))))`?? - // .filter(RoomEventFilter::default()); + .limit(UInt::new(10).unwrap()) + .filter(RoomEventFilter { + lazy_load_options: LazyLoadOptions::Enabled { + include_redundant_members: false, + }, + ..Default::default() + }); let cli = AsyncClient::new(homeserver, Some(session)).unwrap(); assert!(cli.room_messages(builder).await.is_ok()); diff --git a/src/state/mod.rs b/src/state/mod.rs index 19b1460b..c58d4d82 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -22,15 +22,15 @@ use serde::{Deserialize, Serialize}; use crate::base_client::{Client as BaseClient, Token}; use crate::events::push_rules::Ruleset; -use crate::identifiers::{RoomId, UserId}; +use crate::identifiers::{DeviceId, RoomId, UserId}; use crate::models::Room; -use crate::session::Session; use crate::Result; -#[derive(Debug, Default, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ClientState { - /// The current client session containing our user id, device id and access - /// token. - pub session: Option, + /// The `UserId` for the current logged in user. + pub user_id: Option, + /// The `DeviceId` of the current logged in user. + pub device_id: Option, /// The current sync token that should be used for the next sync call. pub sync_token: Option, /// A list of ignored users. @@ -49,7 +49,8 @@ impl ClientState { .. } = client; Self { - session: session.clone(), + user_id: session.as_ref().map(|s| s.user_id.clone()), + device_id: session.as_ref().map(|s| s.device_id.clone()), sync_token: sync_token.clone(), ignored_users: ignored_users.clone(), push_ruleset: push_ruleset.clone(), @@ -60,14 +61,11 @@ impl ClientState { /// Abstraction around the data store to avoid unnecessary request on client initialization. #[async_trait::async_trait] pub trait StateStore: Send + Sync { - /// Signals to the `AsyncClient` if this is the first time a StateStore` has been used. + /// Loads the state of `BaseClient` through `ClientState` type. /// - /// Returns true if `StateStore` has been set up and ready to be loaded from. - async fn initial_use(&self) -> Result; - /// Loads the state of `BaseClient` through `StateStore::Store` type. - async fn load_client_state(&self) -> Result; - /// Load the state of a single `Room` by `RoomId`. - async fn load_room_state(&self, room_id: &RoomId) -> Result; + /// An `Option::None` should be returned only if the `StateStore` tries to + /// load but no state has been stored. + async fn load_client_state(&self) -> Result>; /// Load the state of all `Room`s. /// /// This will be mapped over in the client in order to store `Room`s in an async safe way. @@ -85,8 +83,6 @@ mod test { use std::collections::HashMap; use std::convert::TryFrom; - use crate::identifiers::{RoomId, UserId}; - #[test] fn serialize() { let id = RoomId::try_from("!roomid:example.com").unwrap(); @@ -95,13 +91,14 @@ mod test { let room = Room::new(&id, &user); let state = ClientState { - session: None, + user_id: Some(user.clone()), + device_id: None, sync_token: Some("hello".into()), ignored_users: vec![user], push_ruleset: None, }; assert_eq!( - r#"{"session":null,"sync_token":"hello","ignored_users":["@example:example.com"],"push_ruleset":null}"#, + r#"{"user_id":"@example:example.com","device_id":null,"sync_token":"hello","ignored_users":["@example:example.com"],"push_ruleset":null}"#, serde_json::to_string(&state).unwrap() ); @@ -141,7 +138,8 @@ mod test { let room = Room::new(&id, &user); let state = ClientState { - session: None, + user_id: Some(user.clone()), + device_id: None, sync_token: Some("hello".into()), ignored_users: vec![user], push_ruleset: None, diff --git a/src/state/state_store.rs b/src/state/state_store.rs index 99f84797..87bb474c 100644 --- a/src/state/state_store.rs +++ b/src/state/state_store.rs @@ -2,14 +2,25 @@ use std::collections::HashMap; use std::fs::{self, OpenOptions}; use std::io::{BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use tokio::fs as async_fs; +use tokio::sync::RwLock; use super::{ClientState, StateStore}; use crate::identifiers::RoomId; use crate::{Error, Result, Room}; /// A default `StateStore` implementation that serializes state as json /// and saves it to disk. +/// +/// When logged in the `JsonStore` appends the user_id to it's folder path, +/// so all files are saved in `my_client/user_id/*`. pub struct JsonStore { - path: PathBuf, + path: Arc>, + user_path_set: AtomicBool, } impl JsonStore { @@ -22,39 +33,30 @@ impl JsonStore { std::fs::create_dir_all(p)?; } Ok(Self { - path: p.to_path_buf(), + path: Arc::new(RwLock::new(p.to_path_buf())), + user_path_set: AtomicBool::new(false), }) } } #[async_trait::async_trait] impl StateStore for JsonStore { - async fn initial_use(&self) -> Result { - let mut path = self.path.clone(); - path.push("client.json"); - Ok(fs::read_to_string(path).map_or(false, |s| !s.is_empty())) - } - - async fn load_client_state(&self) -> Result { - let mut path = self.path.clone(); + async fn load_client_state(&self) -> Result> { + let mut path = self.path.read().await.clone(); path.push("client.json"); - let file = OpenOptions::new().read(true).open(path)?; - let reader = BufReader::new(file); - serde_json::from_reader(reader).map_err(Error::from) - } - - async fn load_room_state(&self, room_id: &RoomId) -> Result { - let mut path = self.path.clone(); - path.push(&format!("rooms/{}.json", room_id)); - - let file = OpenOptions::new().read(true).open(path)?; - let reader = BufReader::new(file); - serde_json::from_reader(reader).map_err(Error::from) + let json = async_fs::read_to_string(path) + .await + .map_or(String::default(), |s| s); + if json.is_empty() { + Ok(None) + } else { + serde_json::from_str(&json).map(Some).map_err(Error::from) + } } async fn load_all_rooms(&self) -> Result> { - let mut path = self.path.clone(); + let mut path = self.path.read().await.clone(); path.push("rooms"); let mut rooms_map = HashMap::new(); @@ -78,7 +80,16 @@ impl StateStore for JsonStore { } async fn store_client_state(&self, state: ClientState) -> Result<()> { - let mut path = self.path.clone(); + if !self.user_path_set.load(Ordering::SeqCst) { + if let Some(user) = &state.user_id { + self.user_path_set.swap(true, Ordering::SeqCst); + self.path + .write() + .await + .push(format!("{}", user.localpart())) + } + } + let mut path = self.path.read().await.clone(); path.push("client.json"); if !Path::new(&path).exists() { @@ -101,7 +112,11 @@ impl StateStore for JsonStore { } async fn store_room_state(&self, room: &Room) -> Result<()> { - let mut path = self.path.clone(); + if !self.user_path_set.load(Ordering::SeqCst) { + // TODO Error here, should the load methods also error? + } + + let mut path = self.path.read().await.clone(); path.push(&format!("rooms/{}.json", room.room_id)); if !Path::new(&path).exists() { @@ -170,11 +185,22 @@ mod test { async fn test_store_client_state() { let path: &Path = &PATH; + + let user = UserId::try_from("@example:example.com").unwrap(); + let store = JsonStore::open(path).unwrap(); - let state = ClientState::default(); - store.store_client_state(state).await.unwrap(); + + let state = ClientState { + user_id: Some(user.clone()), + device_id: None, + sync_token: Some("hello".into()), + ignored_users: vec![user], + push_ruleset: None, + }; + + store.store_client_state(state.clone()).await.unwrap(); let loaded = store.load_client_state().await.unwrap(); - assert_eq!(loaded, ClientState::default()); + assert_eq!(loaded, Some(state)); } #[tokio::test] @@ -191,8 +217,8 @@ mod test { let room = Room::new(&id, &user); store.store_room_state(&room).await.unwrap(); - let loaded = store.load_room_state(&id).await.unwrap(); - assert_eq!(loaded, Room::new(&id, &user)); + let loaded = store.load_all_rooms().await.unwrap(); + assert_eq!(loaded.get(&id), Some(&Room::new(&id, &user))); } #[tokio::test]