state_store: add method to manually sync state store, `AsyncClient::sync` now initially syncs with state store
parent
fd7d3db32b
commit
030aa79750
|
@ -16,6 +16,7 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
use std::path::Path;
|
||||||
use std::result::Result as StdResult;
|
use std::result::Result as StdResult;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
@ -75,6 +76,9 @@ impl std::fmt::Debug for AsyncClient {
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
/// Configuration for the creation of the `AsyncClient`.
|
/// Configuration for the creation of the `AsyncClient`.
|
||||||
///
|
///
|
||||||
|
/// When setting the `StateStore` it is up to the user to open/connect
|
||||||
|
/// the storage backend before client creation.
|
||||||
|
///
|
||||||
/// # Example
|
/// # Example
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -86,6 +90,14 @@ impl std::fmt::Debug for AsyncClient {
|
||||||
/// .unwrap()
|
/// .unwrap()
|
||||||
/// .disable_ssl_verification();
|
/// .disable_ssl_verification();
|
||||||
/// ```
|
/// ```
|
||||||
|
/// add the default `JsonStore` to the `AsyncClient`
|
||||||
|
/// ```no_run
|
||||||
|
/// # use matrix_sdk::{AsyncClientConfig, JsonStore};
|
||||||
|
///
|
||||||
|
/// let store = JsonStore::open("path/to/json").unwrap();
|
||||||
|
/// let client_config = AsyncClientConfig::new()
|
||||||
|
/// . state_store(Box::new(store));
|
||||||
|
/// ```
|
||||||
pub struct AsyncClientConfig {
|
pub struct AsyncClientConfig {
|
||||||
proxy: Option<reqwest::Proxy>,
|
proxy: Option<reqwest::Proxy>,
|
||||||
user_agent: Option<HeaderValue>,
|
user_agent: Option<HeaderValue>,
|
||||||
|
@ -145,7 +157,7 @@ impl AsyncClientConfig {
|
||||||
|
|
||||||
/// Set a custom implementation of a `StateStore`.
|
/// Set a custom implementation of a `StateStore`.
|
||||||
///
|
///
|
||||||
/// The state store should be "connected" before being set.
|
/// The state store should be "connected/opened" before being set.
|
||||||
pub fn state_store(mut self, store: Box<dyn StateStore>) -> Self {
|
pub fn state_store(mut self, store: Box<dyn StateStore>) -> Self {
|
||||||
self.state_store = Some(store);
|
self.state_store = Some(store);
|
||||||
self
|
self
|
||||||
|
@ -331,6 +343,18 @@ impl AsyncClient {
|
||||||
self.base_client.read().await.joined_rooms.clone()
|
self.base_client.read().await.joined_rooms.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This allows `AsyncClient` to manually sync state with the provided `StateStore`.
|
||||||
|
///
|
||||||
|
/// Returns true when a successful `StateStore` sync has completed.
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// // TODO
|
||||||
|
/// ```
|
||||||
|
pub async fn sync_with_state_store(&self) -> Result<bool> {
|
||||||
|
self.base_client.write().await.sync_with_state_store().await
|
||||||
|
}
|
||||||
|
|
||||||
/// Login to the server.
|
/// Login to the server.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
|
@ -573,11 +597,28 @@ impl AsyncClient {
|
||||||
|
|
||||||
/// Synchronize the client's state with the latest state on the server.
|
/// Synchronize the client's state with the latest state on the server.
|
||||||
///
|
///
|
||||||
|
/// If a `StateStore` is provided and this is the initial sync state will
|
||||||
|
/// be loaded from the state store.
|
||||||
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// * `sync_settings` - Settings for the sync call.
|
/// * `sync_settings` - Settings for the sync call.
|
||||||
#[instrument]
|
#[instrument]
|
||||||
pub async fn sync(&self, sync_settings: SyncSettings) -> Result<sync_events::IncomingResponse> {
|
pub async fn sync(
|
||||||
|
&self,
|
||||||
|
mut sync_settings: SyncSettings,
|
||||||
|
) -> Result<sync_events::IncomingResponse> {
|
||||||
|
{
|
||||||
|
if self.base_client.read().await.is_state_store_synced() {
|
||||||
|
if let Ok(synced) = self.sync_with_state_store().await {
|
||||||
|
if synced {
|
||||||
|
// once synced, update the sync token to the last known state from `StateStore`.
|
||||||
|
sync_settings.token = self.sync_token().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let request = sync_events::Request {
|
let request = sync_events::Request {
|
||||||
filter: None,
|
filter: None,
|
||||||
since: sync_settings.token,
|
since: sync_settings.token,
|
||||||
|
|
|
@ -78,6 +78,8 @@ pub struct Client {
|
||||||
pub event_emitter: Option<Box<dyn EventEmitter>>,
|
pub event_emitter: Option<Box<dyn EventEmitter>>,
|
||||||
///
|
///
|
||||||
pub state_store: Option<Box<dyn StateStore>>,
|
pub state_store: Option<Box<dyn StateStore>>,
|
||||||
|
/// Does the `Client` need to sync with the state store.
|
||||||
|
needs_state_store_sync: bool,
|
||||||
|
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
olm: Arc<Mutex<Option<OlmMachine>>>,
|
olm: Arc<Mutex<Option<OlmMachine>>>,
|
||||||
|
@ -118,6 +120,7 @@ impl Client {
|
||||||
push_ruleset: None,
|
push_ruleset: None,
|
||||||
event_emitter: None,
|
event_emitter: None,
|
||||||
state_store: None,
|
state_store: None,
|
||||||
|
needs_state_store_sync: true,
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
olm: Arc::new(Mutex::new(olm)),
|
olm: Arc::new(Mutex::new(olm)),
|
||||||
})
|
})
|
||||||
|
@ -135,6 +138,39 @@ impl Client {
|
||||||
self.event_emitter = Some(emitter);
|
self.event_emitter = Some(emitter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the state store has been loaded into the client.
|
||||||
|
pub fn is_state_store_synced(&self) -> bool {
|
||||||
|
!self.needs_state_store_sync
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When a client is provided the state store will load state from the `StateStore`.
|
||||||
|
///
|
||||||
|
/// Returns `true` when a sync has successfully completed.
|
||||||
|
pub(crate) async fn sync_with_state_store(&mut self) -> Result<bool> {
|
||||||
|
if let Some(store) = self.state_store.as_ref() {
|
||||||
|
let ClientState {
|
||||||
|
session,
|
||||||
|
sync_token,
|
||||||
|
ignored_users,
|
||||||
|
push_ruleset,
|
||||||
|
} = store.load_client_state().await?;
|
||||||
|
let mut rooms = store.load_all_rooms().await?;
|
||||||
|
|
||||||
|
self.joined_rooms = rooms
|
||||||
|
.drain()
|
||||||
|
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
|
||||||
|
.collect();
|
||||||
|
self.session = session;
|
||||||
|
self.sync_token = sync_token;
|
||||||
|
self.ignored_users = ignored_users;
|
||||||
|
self.push_ruleset = push_ruleset;
|
||||||
|
|
||||||
|
self.needs_state_store_sync = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(!self.needs_state_store_sync)
|
||||||
|
}
|
||||||
|
|
||||||
/// Receive a login response and update the session of the client.
|
/// Receive a login response and update the session of the client.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
|
@ -158,25 +194,6 @@ impl Client {
|
||||||
*olm = Some(OlmMachine::new(&response.user_id, &response.device_id)?);
|
*olm = Some(OlmMachine::new(&response.user_id, &response.device_id)?);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(store) = self.state_store.as_ref() {
|
|
||||||
let ClientState {
|
|
||||||
session,
|
|
||||||
sync_token,
|
|
||||||
ignored_users,
|
|
||||||
push_ruleset,
|
|
||||||
} = store.load_client_state().await?;
|
|
||||||
let mut rooms = store.load_all_rooms().await?;
|
|
||||||
|
|
||||||
self.joined_rooms = rooms
|
|
||||||
.drain()
|
|
||||||
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
|
|
||||||
.collect();
|
|
||||||
self.session = session;
|
|
||||||
self.sync_token = sync_token;
|
|
||||||
self.ignored_users = ignored_users;
|
|
||||||
self.push_ruleset = push_ruleset;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -242,16 +242,18 @@ mod test {
|
||||||
AsyncClient::new_with_config(homeserver.clone(), Some(session.clone()), config)
|
AsyncClient::new_with_config(homeserver.clone(), Some(session.clone()), config)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000));
|
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000));
|
||||||
let _ = client.sync(sync_settings).await.unwrap();
|
// fake a sync to skip the load with the state store, this will fail as the files won't exist
|
||||||
|
// but the `AsyncClient::sync` will skip `StateStore::load_*`
|
||||||
|
assert!(client.sync_with_state_store().await.is_err());
|
||||||
|
// gather state to save to the db
|
||||||
|
let _ = client.sync(sync_settings.clone()).await.unwrap();
|
||||||
|
|
||||||
// once logged in without syncing the client is updated from the state store
|
// now syncing the client will update from the state store
|
||||||
let config =
|
let config =
|
||||||
AsyncClientConfig::default().state_store(Box::new(JsonStore::open(path).unwrap()));
|
AsyncClientConfig::default().state_store(Box::new(JsonStore::open(path).unwrap()));
|
||||||
let client = AsyncClient::new_with_config(homeserver, None, config).unwrap();
|
let client =
|
||||||
client
|
AsyncClient::new_with_config(homeserver, Some(session.clone()), config).unwrap();
|
||||||
.login("example", "wordpass", None, None)
|
client.sync(sync_settings).await.unwrap();
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let base_client = client.base_client.read().await;
|
let base_client = client.base_client.read().await;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue