From 030aa7975043b2915f1989316c1ee6349dc2754a Mon Sep 17 00:00:00 2001 From: Devin R Date: Thu, 23 Apr 2020 19:37:27 -0400 Subject: [PATCH] state_store: add method to manually sync state store, `AsyncClient::sync` now initially syncs with state store --- src/async_client.rs | 45 ++++++++++++++++++++++++++++++-- src/base_client.rs | 55 ++++++++++++++++++++++++++-------------- src/state/state_store.rs | 16 +++++++----- 3 files changed, 88 insertions(+), 28 deletions(-) diff --git a/src/async_client.rs b/src/async_client.rs index aae5f5d8..9321bfcc 100644 --- a/src/async_client.rs +++ b/src/async_client.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::ops::Deref; +use std::path::Path; use std::result::Result as StdResult; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -75,6 +76,9 @@ impl std::fmt::Debug for AsyncClient { #[derive(Default)] /// Configuration for the creation of the `AsyncClient`. /// +/// When setting the `StateStore` it is up to the user to open/connect +/// the storage backend before client creation. +/// /// # Example /// /// ``` @@ -86,6 +90,14 @@ impl std::fmt::Debug for AsyncClient { /// .unwrap() /// .disable_ssl_verification(); /// ``` +/// add the default `JsonStore` to the `AsyncClient` +/// ```no_run +/// # use matrix_sdk::{AsyncClientConfig, JsonStore}; +/// +/// let store = JsonStore::open("path/to/json").unwrap(); +/// let client_config = AsyncClientConfig::new() +/// . state_store(Box::new(store)); +/// ``` pub struct AsyncClientConfig { proxy: Option, user_agent: Option, @@ -145,7 +157,7 @@ impl AsyncClientConfig { /// Set a custom implementation of a `StateStore`. /// - /// The state store should be "connected" before being set. + /// The state store should be "connected/opened" before being set. pub fn state_store(mut self, store: Box) -> Self { self.state_store = Some(store); self @@ -331,6 +343,18 @@ impl AsyncClient { self.base_client.read().await.joined_rooms.clone() } + /// This allows `AsyncClient` to manually sync state with the provided `StateStore`. + /// + /// Returns true when a successful `StateStore` sync has completed. + /// # Examples + /// + /// ``` + /// // TODO + /// ``` + pub async fn sync_with_state_store(&self) -> Result { + self.base_client.write().await.sync_with_state_store().await + } + /// Login to the server. /// /// # Arguments @@ -573,11 +597,28 @@ impl AsyncClient { /// Synchronize the client's state with the latest state on the server. /// + /// If a `StateStore` is provided and this is the initial sync state will + /// be loaded from the state store. + /// /// # Arguments /// /// * `sync_settings` - Settings for the sync call. #[instrument] - pub async fn sync(&self, sync_settings: SyncSettings) -> Result { + pub async fn sync( + &self, + mut sync_settings: SyncSettings, + ) -> Result { + { + if self.base_client.read().await.is_state_store_synced() { + if let Ok(synced) = self.sync_with_state_store().await { + if synced { + // once synced, update the sync token to the last known state from `StateStore`. + sync_settings.token = self.sync_token().await; + } + } + } + } + let request = sync_events::Request { filter: None, since: sync_settings.token, diff --git a/src/base_client.rs b/src/base_client.rs index f1d04759..b10a5601 100644 --- a/src/base_client.rs +++ b/src/base_client.rs @@ -78,6 +78,8 @@ pub struct Client { pub event_emitter: Option>, /// pub state_store: Option>, + /// Does the `Client` need to sync with the state store. + needs_state_store_sync: bool, #[cfg(feature = "encryption")] olm: Arc>>, @@ -118,6 +120,7 @@ impl Client { push_ruleset: None, event_emitter: None, state_store: None, + needs_state_store_sync: true, #[cfg(feature = "encryption")] olm: Arc::new(Mutex::new(olm)), }) @@ -135,6 +138,39 @@ impl Client { self.event_emitter = Some(emitter); } + /// Returns true if the state store has been loaded into the client. + pub fn is_state_store_synced(&self) -> bool { + !self.needs_state_store_sync + } + + /// When a client is provided the state store will load state from the `StateStore`. + /// + /// Returns `true` when a sync has successfully completed. + pub(crate) async fn sync_with_state_store(&mut self) -> Result { + if let Some(store) = self.state_store.as_ref() { + let ClientState { + session, + sync_token, + ignored_users, + push_ruleset, + } = store.load_client_state().await?; + let mut rooms = store.load_all_rooms().await?; + + self.joined_rooms = rooms + .drain() + .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) + .collect(); + self.session = session; + self.sync_token = sync_token; + self.ignored_users = ignored_users; + self.push_ruleset = push_ruleset; + + self.needs_state_store_sync = false; + } + + Ok(!self.needs_state_store_sync) + } + /// Receive a login response and update the session of the client. /// /// # Arguments @@ -158,25 +194,6 @@ impl Client { *olm = Some(OlmMachine::new(&response.user_id, &response.device_id)?); } - if let Some(store) = self.state_store.as_ref() { - let ClientState { - session, - sync_token, - ignored_users, - push_ruleset, - } = store.load_client_state().await?; - let mut rooms = store.load_all_rooms().await?; - - self.joined_rooms = rooms - .drain() - .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) - .collect(); - self.session = session; - self.sync_token = sync_token; - self.ignored_users = ignored_users; - self.push_ruleset = push_ruleset; - } - Ok(()) } diff --git a/src/state/state_store.rs b/src/state/state_store.rs index 98d2f7c3..964c4e84 100644 --- a/src/state/state_store.rs +++ b/src/state/state_store.rs @@ -242,16 +242,18 @@ mod test { AsyncClient::new_with_config(homeserver.clone(), Some(session.clone()), config) .unwrap(); let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); - let _ = client.sync(sync_settings).await.unwrap(); + // fake a sync to skip the load with the state store, this will fail as the files won't exist + // but the `AsyncClient::sync` will skip `StateStore::load_*` + assert!(client.sync_with_state_store().await.is_err()); + // gather state to save to the db + let _ = client.sync(sync_settings.clone()).await.unwrap(); - // once logged in without syncing the client is updated from the state store + // now syncing the client will update from the state store let config = AsyncClientConfig::default().state_store(Box::new(JsonStore::open(path).unwrap())); - let client = AsyncClient::new_with_config(homeserver, None, config).unwrap(); - client - .login("example", "wordpass", None, None) - .await - .unwrap(); + let client = + AsyncClient::new_with_config(homeserver, Some(session.clone()), config).unwrap(); + client.sync(sync_settings).await.unwrap(); let base_client = client.base_client.read().await;