base: Move the state store behind a lock.
parent
6e9e817597
commit
02013ac286
|
@ -285,10 +285,10 @@ impl AsyncClient {
|
||||||
|
|
||||||
let http_client = http_client.default_headers(headers).build()?;
|
let http_client = http_client.default_headers(headers).build()?;
|
||||||
|
|
||||||
let mut base_client = BaseClient::new(session)?;
|
let base_client = if let Some(store) = config.state_store {
|
||||||
|
BaseClient::new_with_state_store(session, store)?
|
||||||
if let Some(store) = config.state_store {
|
} else {
|
||||||
base_client.state_store = Some(store);
|
BaseClient::new(session)?
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
|
|
@ -82,7 +82,7 @@ pub struct Client {
|
||||||
/// some `BaseClient` state during `AsyncClient::sync` calls.
|
/// some `BaseClient` state during `AsyncClient::sync` calls.
|
||||||
///
|
///
|
||||||
/// There is a default implementation `JsonStore` that saves JSON to disk.
|
/// There is a default implementation `JsonStore` that saves JSON to disk.
|
||||||
pub state_store: Option<Box<dyn StateStore>>,
|
pub state_store: Arc<RwLock<Option<Box<dyn StateStore>>>>,
|
||||||
/// Does the `Client` need to sync with the state store.
|
/// Does the `Client` need to sync with the state store.
|
||||||
needs_state_store_sync: Arc<AtomicBool>,
|
needs_state_store_sync: Arc<AtomicBool>,
|
||||||
|
|
||||||
|
@ -111,6 +111,26 @@ impl Client {
|
||||||
/// * `session` - An optional session if the user already has one from a
|
/// * `session` - An optional session if the user already has one from a
|
||||||
/// previous login call.
|
/// previous login call.
|
||||||
pub fn new(session: Option<Session>) -> Result<Self> {
|
pub fn new(session: Option<Session>) -> Result<Self> {
|
||||||
|
Client::new_helper(session, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new client.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `session` - An optional session if the user already has one from a
|
||||||
|
/// previous login call.
|
||||||
|
///
|
||||||
|
/// * `store` - An open state store implementation that will be used through
|
||||||
|
/// the lifetime of the client.
|
||||||
|
pub fn new_with_state_store(
|
||||||
|
session: Option<Session>,
|
||||||
|
store: Box<dyn StateStore>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
Client::new_helper(session, Some(store))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_helper(session: Option<Session>, store: Option<Box<dyn StateStore>>) -> Result<Self> {
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
let olm = match &session {
|
let olm = match &session {
|
||||||
Some(s) => Some(OlmMachine::new(&s.user_id, &s.device_id)),
|
Some(s) => Some(OlmMachine::new(&s.user_id, &s.device_id)),
|
||||||
|
@ -124,7 +144,7 @@ impl Client {
|
||||||
ignored_users: Arc::new(RwLock::new(Vec::new())),
|
ignored_users: Arc::new(RwLock::new(Vec::new())),
|
||||||
push_ruleset: Arc::new(RwLock::new(None)),
|
push_ruleset: Arc::new(RwLock::new(None)),
|
||||||
event_emitter: Arc::new(RwLock::new(None)),
|
event_emitter: Arc::new(RwLock::new(None)),
|
||||||
state_store: None,
|
state_store: Arc::new(RwLock::new(store)),
|
||||||
needs_state_store_sync: Arc::new(AtomicBool::from(true)),
|
needs_state_store_sync: Arc::new(AtomicBool::from(true)),
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
olm: Arc::new(Mutex::new(olm)),
|
olm: Arc::new(Mutex::new(olm)),
|
||||||
|
@ -154,7 +174,8 @@ impl Client {
|
||||||
///
|
///
|
||||||
/// Returns `true` when a state store sync has successfully completed.
|
/// Returns `true` when a state store sync has successfully completed.
|
||||||
pub(crate) async fn sync_with_state_store(&mut self) -> Result<bool> {
|
pub(crate) async fn sync_with_state_store(&mut self) -> Result<bool> {
|
||||||
if let Some(store) = self.state_store.as_ref() {
|
let store = self.state_store.read().await;
|
||||||
|
if let Some(store) = store.as_ref() {
|
||||||
if let Some(sess) = self.session.read().await.as_ref() {
|
if let Some(sess) = self.session.read().await.as_ref() {
|
||||||
if let Some(client_state) = store.load_client_state(sess).await? {
|
if let Some(client_state) = store.load_client_state(sess).await? {
|
||||||
let ClientState {
|
let ClientState {
|
||||||
|
@ -523,7 +544,9 @@ impl Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
if updated {
|
if updated {
|
||||||
if let Some(store) = self.state_store.as_ref() {
|
let store = self.state_store.read().await;
|
||||||
|
|
||||||
|
if let Some(store) = store.as_ref() {
|
||||||
store
|
store
|
||||||
.store_room_state(matrix_room.read().await.deref())
|
.store_room_state(matrix_room.read().await.deref())
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -532,7 +555,9 @@ impl Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
if updated {
|
if updated {
|
||||||
if let Some(store) = self.state_store.as_ref() {
|
let store = self.state_store.read().await;
|
||||||
|
|
||||||
|
if let Some(store) = store.as_ref() {
|
||||||
let state = ClientState::from_base_client(&self).await;
|
let state = ClientState::from_base_client(&self).await;
|
||||||
store.store_client_state(state).await?;
|
store.store_client_state(state).await?;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue