From ef560fd545681eab95f50e1eb3116636e97c982b Mon Sep 17 00:00:00 2001 From: Devin R Date: Wed, 29 Apr 2020 07:00:14 -0400 Subject: [PATCH] state_store: load_client takes a sessions obj, docs, copyright, move state store into base client --- matrix_sdk/src/async_client.rs | 6 +- matrix_sdk/src/base_client.rs | 96 ++++++++++++++++------------- matrix_sdk/src/state/mod.rs | 7 +-- matrix_sdk/src/state/state_store.rs | 50 ++++++++------- matrix_sdk/src/test_builder.rs | 2 +- 5 files changed, 90 insertions(+), 71 deletions(-) diff --git a/matrix_sdk/src/async_client.rs b/matrix_sdk/src/async_client.rs index 76f639a6..0516c815 100644 --- a/matrix_sdk/src/async_client.rs +++ b/matrix_sdk/src/async_client.rs @@ -675,8 +675,8 @@ impl AsyncClient { let decrypted_event = { let mut client = self.base_client.write().await; let mut timeline_update = false; - let decrypt_ev = client - .receive_joined_timeline_event(room_id, &mut event, &mut timeline_update) + let (decrypt_ev, timeline_update) = client + .receive_joined_timeline_event(room_id, &mut event) .await; if timeline_update { updated = true; @@ -746,7 +746,7 @@ impl AsyncClient { } let mut client = self.base_client.write().await; - client.receive_sync_response(&mut response).await; + client.receive_sync_response(&mut response, updated).await?; if updated { if let Some(store) = client.state_store.as_ref() { diff --git a/matrix_sdk/src/base_client.rs b/matrix_sdk/src/base_client.rs index 0f17dca7..ac2d5cce 100644 --- a/matrix_sdk/src/base_client.rs +++ b/matrix_sdk/src/base_client.rs @@ -76,7 +76,10 @@ pub struct Client { /// Any implementor of EventEmitter will act as the callbacks for various /// events. pub event_emitter: Option>, + /// Any implementor of `StateStore` will be called to save `Room` and + /// some `BaseClient` state during `AsyncClient::sync` calls. /// + /// There is a default implementation `JsonStore` that saves JSON to disk. pub state_store: Option>, /// Does the `Client` need to sync with the state store. needs_state_store_sync: bool, @@ -148,41 +151,42 @@ impl Client { /// Returns `true` when a sync has successfully completed. pub(crate) async fn sync_with_state_store(&mut self) -> Result { if let Some(store) = self.state_store.as_ref() { - if let Some(client_state) = store.load_client_state().await? { - let ClientState { - user_id, - device_id, - sync_token, - ignored_users, - push_ruleset, - } = client_state; + if let Some(sess) = self.session.as_ref() { + if let Some(client_state) = store.load_client_state(sess).await? { + let ClientState { + user_id, + device_id, + sync_token, + ignored_users, + push_ruleset, + } = client_state; - if let Some(sess) = self.session.as_mut() { - if let Some(device) = device_id { - sess.device_id = device; - } - if let Some(user) = user_id { - sess.user_id = user; + if let Some(sess) = self.session.as_mut() { + if let Some(device) = device_id { + sess.device_id = device; + } + if let Some(user) = user_id { + sess.user_id = user; + } } + self.sync_token = sync_token; + self.ignored_users = ignored_users; + self.push_ruleset = push_ruleset; + } else { + // return false and continues with a sync request then save the state and create + // and populate the files during the sync + return Ok(false); } - self.sync_token = sync_token; - self.ignored_users = ignored_users; - self.push_ruleset = push_ruleset; - } else { - // return false and continues with a sync request then save the state and create - // and populate the files during the sync - return Ok(false); + + let mut rooms = store.load_all_rooms().await?; + self.joined_rooms = rooms + .drain() + .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) + .collect(); + + self.needs_state_store_sync = false; } - - let mut rooms = store.load_all_rooms().await?; - self.joined_rooms = rooms - .drain() - .map(|(k, room)| (k, Arc::new(RwLock::new(room)))) - .collect(); - - self.needs_state_store_sync = false; } - Ok(!self.needs_state_store_sync) } @@ -278,23 +282,19 @@ impl Client { /// Receive a timeline event for a joined room and update the client state. /// - /// If the event was a encrypted room event and decryption was successful - /// the decrypted event will be returned, otherwise None. + /// Returns a tuple of the successfully decrypted event, or None on failure and + /// a bool, true when the `Room` state has been updated. /// /// # Arguments /// /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - /// - /// * `did_update` - This is used internally to confirm when the state has - /// been updated. pub async fn receive_joined_timeline_event( &mut self, room_id: &RoomId, event: &mut EventJson, - did_update: &mut bool, - ) -> Option> { + ) -> (Option>, bool) { match event.deserialize() { #[allow(unused_mut)] Ok(mut e) => { @@ -319,11 +319,9 @@ impl Client { } let mut room = self.get_or_create_room(&room_id).write().await; - // TODO is passing in the bool to use in `AsyncClient::sync` ok here - *did_update = room.receive_timeline_event(&e); - decrypted_event + (decrypted_event, room.receive_timeline_event(&e)) } - _ => None, + _ => (None, false), } } @@ -419,7 +417,13 @@ impl Client { /// # Arguments /// /// * `response` - The response that we received after a successful sync. - pub async fn receive_sync_response(&mut self, response: &mut api::sync::sync_events::Response) { + /// + /// * `did_update` - Signals to the `StateStore` if the client state needs updating. + pub async fn receive_sync_response( + &mut self, + response: &mut api::sync::sync_events::Response, + did_update: bool, + ) -> Result<()> { self.sync_token = Some(response.next_batch.clone()); #[cfg(feature = "encryption")] @@ -442,6 +446,14 @@ impl Client { } } } + + if did_update { + if let Some(store) = self.state_store.as_ref() { + let state = ClientState::from_base_client(&self); + store.store_client_state(state).await?; + } + } + Ok(()) } /// Should account or one-time keys be uploaded to the server. diff --git a/matrix_sdk/src/state/mod.rs b/matrix_sdk/src/state/mod.rs index 723bdb07..3ca6ea7e 100644 --- a/matrix_sdk/src/state/mod.rs +++ b/matrix_sdk/src/state/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2020 Damir Jelić +// Copyright 2020 Devin Ragotzy // Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -23,8 +23,7 @@ use serde::{Deserialize, Serialize}; use crate::base_client::{Client as BaseClient, Token}; use crate::events::push_rules::Ruleset; use crate::identifiers::{DeviceId, RoomId, UserId}; -use crate::models::Room; -use crate::Result; +use crate::{Result, Room, Session}; /// `ClientState` holds all the information to restore a `BaseClient` /// except the `access_token` as the default store is not secure. @@ -72,7 +71,7 @@ pub trait StateStore: Send + Sync { /// /// An `Option::None` should be returned only if the `StateStore` tries to /// load but no state has been stored. - async fn load_client_state(&self) -> Result>; + async fn load_client_state(&self, _: &Session) -> Result>; /// Load the state of all `Room`s. /// /// This will be mapped over in the client in order to store `Room`s in an async safe way. diff --git a/matrix_sdk/src/state/state_store.rs b/matrix_sdk/src/state/state_store.rs index 83761553..9e8d33fc 100644 --- a/matrix_sdk/src/state/state_store.rs +++ b/matrix_sdk/src/state/state_store.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use std::fs::{self, OpenOptions}; -use std::io::{BufReader, BufWriter, Write}; +use std::fs; use std::path::{Path, PathBuf}; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -8,16 +7,17 @@ use std::sync::{ }; use tokio::fs as async_fs; +use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; use super::{ClientState, StateStore}; use crate::identifiers::RoomId; -use crate::{Error, Result, Room}; +use crate::{Error, Result, Room, Session}; /// A default `StateStore` implementation that serializes state as json /// and saves it to disk. /// /// When logged in the `JsonStore` appends the user_id to it's folder path, -/// so all files are saved in `my_client/user_id/*`. +/// so all files are saved in `my_client/user_id_localpart/*`. pub struct JsonStore { path: Arc>, user_path_set: AtomicBool, @@ -41,7 +41,12 @@ impl JsonStore { #[async_trait::async_trait] impl StateStore for JsonStore { - async fn load_client_state(&self) -> Result> { + async fn load_client_state(&self, sess: &Session) -> Result> { + if !self.user_path_set.load(Ordering::SeqCst) { + self.user_path_set.swap(true, Ordering::SeqCst); + self.path.write().await.push(sess.user_id.localpart()) + } + let mut path = self.path.read().await.clone(); path.push("client.json"); @@ -67,10 +72,9 @@ impl StateStore for JsonStore { continue; } - let f_hdl = OpenOptions::new().read(true).open(&file)?; - let reader = BufReader::new(f_hdl); + let json = async_fs::read_to_string(&file).await?; - let room = serde_json::from_reader::<_, Room>(reader).map_err(Error::from)?; + let room = serde_json::from_str::(&json).map_err(Error::from)?; let room_id = room.room_id.clone(); rooms_map.insert(room_id, room); @@ -97,15 +101,13 @@ impl StateStore for JsonStore { let json = serde_json::to_string(&state).map_err(Error::from)?; - let file = OpenOptions::new() + let mut file = async_fs::OpenOptions::new() .write(true) .create(true) .truncate(true) - .open(path)?; - let mut writer = BufWriter::new(file); - writer.write_all(json.as_bytes())?; - - Ok(()) + .open(path) + .await?; + file.write_all(json.as_bytes()).await.map_err(Error::from) } async fn store_room_state(&self, room: &Room) -> Result<()> { @@ -125,15 +127,13 @@ impl StateStore for JsonStore { let json = serde_json::to_string(&room).map_err(Error::from)?; - let file = OpenOptions::new() + let mut file = async_fs::OpenOptions::new() .write(true) .create(true) .truncate(true) - .open(path)?; - let mut writer = BufWriter::new(file); - writer.write_all(json.as_bytes())?; - - Ok(()) + .open(path) + .await?; + file.write_all(json.as_bytes()).await.map_err(Error::from) } } @@ -187,6 +187,12 @@ mod test { let user = UserId::try_from("@example:example.com").unwrap(); + let sess = Session { + access_token: "32nj9zu034btz90".to_string(), + user_id: user.clone(), + device_id: "Tester".to_string(), + }; + let store = JsonStore::open(path).unwrap(); let state = ClientState { @@ -198,7 +204,9 @@ mod test { }; store.store_client_state(state.clone()).await.unwrap(); - let loaded = store.load_client_state().await.unwrap(); + + let store = JsonStore::open(path).unwrap(); + let loaded = store.load_client_state(&sess).await.unwrap(); assert_eq!(loaded, Some(state)); } diff --git a/matrix_sdk/src/test_builder.rs b/matrix_sdk/src/test_builder.rs index b6badf5e..1b18c8f3 100644 --- a/matrix_sdk/src/test_builder.rs +++ b/matrix_sdk/src/test_builder.rs @@ -367,7 +367,7 @@ impl ClientTestRunner { } for event in &self.room_events { - cli.receive_joined_timeline_event(room_id, &mut EventJson::from(event), &mut false) + cli.receive_joined_timeline_event(room_id, &mut EventJson::from(event)) .await; } for event in &self.presence_events {