state_store: add user_id localpart to state store path from session

master
Devin R 2020-04-24 19:57:51 -04:00
parent 316295bb77
commit 23755b10ab
6 changed files with 108 additions and 70 deletions

View File

@ -52,7 +52,7 @@ features = ["std", "std-future"]
[dependencies.tokio]
version = "0.2.16"
default-features = false
features = ["sync", "time"]
features = ["sync", "time", "fs"]
[dependencies.sqlx]
version = "0.3.3"

View File

@ -18,7 +18,6 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::ops::Deref;
use std::path::Path;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -159,7 +158,7 @@ impl AsyncClientConfig {
/// Set a custom implementation of a `StateStore`.
///
/// The state store should be "connected/opened" before being set.
/// The state store should be opened before being set.
pub fn state_store(mut self, store: Box<dyn StateStore>) -> Self {
self.state_store = Some(store);
self
@ -610,7 +609,7 @@ impl AsyncClient {
#[instrument]
pub async fn sync(&self, mut sync_settings: SyncSettings) -> Result<sync_events::Response> {
{
// if the client hasn't been synced from the state store don't sync again
// if the client has been synced from the state store don't sync again
if !self.base_client.read().await.is_state_store_synced() {
// this will bail out returning false if the store has not been set up
if let Ok(synced) = self.sync_with_state_store().await {

View File

@ -148,27 +148,37 @@ impl Client {
/// Returns `true` when a sync has successfully completed.
pub(crate) async fn sync_with_state_store(&mut self) -> Result<bool> {
if let Some(store) = self.state_store.as_ref() {
// return false and continues with a sync request then saves the state and creates
// and populates the files during the sync
if !store.initial_use().await? {
if let Some(client_state) = store.load_client_state().await? {
let ClientState {
user_id,
device_id,
sync_token,
ignored_users,
push_ruleset,
} = client_state;
if let Some(sess) = self.session.as_mut() {
if let Some(device) = device_id {
sess.device_id = device;
}
if let Some(user) = user_id {
sess.user_id = user;
}
}
self.sync_token = sync_token;
self.ignored_users = ignored_users;
self.push_ruleset = push_ruleset;
} else {
// return false and continues with a sync request then save the state and create
// and populate the files during the sync
return Ok(false);
}
let ClientState {
session,
sync_token,
ignored_users,
push_ruleset,
} = store.load_client_state().await?;
let mut rooms = store.load_all_rooms().await?;
let mut rooms = store.load_all_rooms().await?;
self.joined_rooms = rooms
.drain()
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
.collect();
self.session = session;
self.sync_token = sync_token;
self.ignored_users = ignored_users;
self.push_ruleset = push_ruleset;
self.needs_state_store_sync = false;
}

View File

@ -295,6 +295,7 @@ mod test {
use super::*;
use crate::events::room::power_levels::NotificationPowerLevels;
use crate::{identifiers::RoomId, AsyncClient, Session};
use api::r0::filter::{LazyLoadOptions, RoomEventFilter};
use js_int::Int;
use mockito::{mock, Matcher};
@ -371,9 +372,13 @@ mod test {
.from("t47429-4392820_219380_26003_2265".to_string())
.to("t4357353_219380_26003_2265".to_string())
.direction(Direction::Backward)
.limit(UInt::new(10).unwrap());
// TODO this makes ruma error `Err(IntoHttp(IntoHttpError(Query(Custom("unsupported value")))))`??
// .filter(RoomEventFilter::default());
.limit(UInt::new(10).unwrap())
.filter(RoomEventFilter {
lazy_load_options: LazyLoadOptions::Enabled {
include_redundant_members: false,
},
..Default::default()
});
let cli = AsyncClient::new(homeserver, Some(session)).unwrap();
assert!(cli.room_messages(builder).await.is_ok());

View File

@ -22,15 +22,15 @@ use serde::{Deserialize, Serialize};
use crate::base_client::{Client as BaseClient, Token};
use crate::events::push_rules::Ruleset;
use crate::identifiers::{RoomId, UserId};
use crate::identifiers::{DeviceId, RoomId, UserId};
use crate::models::Room;
use crate::session::Session;
use crate::Result;
#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ClientState {
/// The current client session containing our user id, device id and access
/// token.
pub session: Option<Session>,
/// The `UserId` for the current logged in user.
pub user_id: Option<UserId>,
/// The `DeviceId` of the current logged in user.
pub device_id: Option<DeviceId>,
/// The current sync token that should be used for the next sync call.
pub sync_token: Option<Token>,
/// A list of ignored users.
@ -49,7 +49,8 @@ impl ClientState {
..
} = client;
Self {
session: session.clone(),
user_id: session.as_ref().map(|s| s.user_id.clone()),
device_id: session.as_ref().map(|s| s.device_id.clone()),
sync_token: sync_token.clone(),
ignored_users: ignored_users.clone(),
push_ruleset: push_ruleset.clone(),
@ -60,14 +61,11 @@ impl ClientState {
/// Abstraction around the data store to avoid unnecessary request on client initialization.
#[async_trait::async_trait]
pub trait StateStore: Send + Sync {
/// Signals to the `AsyncClient` if this is the first time a StateStore` has been used.
/// Loads the state of `BaseClient` through `ClientState` type.
///
/// Returns true if `StateStore` has been set up and ready to be loaded from.
async fn initial_use(&self) -> Result<bool>;
/// Loads the state of `BaseClient` through `StateStore::Store` type.
async fn load_client_state(&self) -> Result<ClientState>;
/// Load the state of a single `Room` by `RoomId`.
async fn load_room_state(&self, room_id: &RoomId) -> Result<Room>;
/// An `Option::None` should be returned only if the `StateStore` tries to
/// load but no state has been stored.
async fn load_client_state(&self) -> Result<Option<ClientState>>;
/// Load the state of all `Room`s.
///
/// This will be mapped over in the client in order to store `Room`s in an async safe way.
@ -85,8 +83,6 @@ mod test {
use std::collections::HashMap;
use std::convert::TryFrom;
use crate::identifiers::{RoomId, UserId};
#[test]
fn serialize() {
let id = RoomId::try_from("!roomid:example.com").unwrap();
@ -95,13 +91,14 @@ mod test {
let room = Room::new(&id, &user);
let state = ClientState {
session: None,
user_id: Some(user.clone()),
device_id: None,
sync_token: Some("hello".into()),
ignored_users: vec![user],
push_ruleset: None,
};
assert_eq!(
r#"{"session":null,"sync_token":"hello","ignored_users":["@example:example.com"],"push_ruleset":null}"#,
r#"{"user_id":"@example:example.com","device_id":null,"sync_token":"hello","ignored_users":["@example:example.com"],"push_ruleset":null}"#,
serde_json::to_string(&state).unwrap()
);
@ -141,7 +138,8 @@ mod test {
let room = Room::new(&id, &user);
let state = ClientState {
session: None,
user_id: Some(user.clone()),
device_id: None,
sync_token: Some("hello".into()),
ignored_users: vec![user],
push_ruleset: None,

View File

@ -2,14 +2,25 @@ use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::fs as async_fs;
use tokio::sync::RwLock;
use super::{ClientState, StateStore};
use crate::identifiers::RoomId;
use crate::{Error, Result, Room};
/// A default `StateStore` implementation that serializes state as json
/// and saves it to disk.
///
/// When logged in the `JsonStore` appends the user_id to it's folder path,
/// so all files are saved in `my_client/user_id/*`.
pub struct JsonStore {
path: PathBuf,
path: Arc<RwLock<PathBuf>>,
user_path_set: AtomicBool,
}
impl JsonStore {
@ -22,39 +33,30 @@ impl JsonStore {
std::fs::create_dir_all(p)?;
}
Ok(Self {
path: p.to_path_buf(),
path: Arc::new(RwLock::new(p.to_path_buf())),
user_path_set: AtomicBool::new(false),
})
}
}
#[async_trait::async_trait]
impl StateStore for JsonStore {
async fn initial_use(&self) -> Result<bool> {
let mut path = self.path.clone();
path.push("client.json");
Ok(fs::read_to_string(path).map_or(false, |s| !s.is_empty()))
}
async fn load_client_state(&self) -> Result<ClientState> {
let mut path = self.path.clone();
async fn load_client_state(&self) -> Result<Option<ClientState>> {
let mut path = self.path.read().await.clone();
path.push("client.json");
let file = OpenOptions::new().read(true).open(path)?;
let reader = BufReader::new(file);
serde_json::from_reader(reader).map_err(Error::from)
}
async fn load_room_state(&self, room_id: &RoomId) -> Result<Room> {
let mut path = self.path.clone();
path.push(&format!("rooms/{}.json", room_id));
let file = OpenOptions::new().read(true).open(path)?;
let reader = BufReader::new(file);
serde_json::from_reader(reader).map_err(Error::from)
let json = async_fs::read_to_string(path)
.await
.map_or(String::default(), |s| s);
if json.is_empty() {
Ok(None)
} else {
serde_json::from_str(&json).map(Some).map_err(Error::from)
}
}
async fn load_all_rooms(&self) -> Result<HashMap<RoomId, Room>> {
let mut path = self.path.clone();
let mut path = self.path.read().await.clone();
path.push("rooms");
let mut rooms_map = HashMap::new();
@ -78,7 +80,16 @@ impl StateStore for JsonStore {
}
async fn store_client_state(&self, state: ClientState) -> Result<()> {
let mut path = self.path.clone();
if !self.user_path_set.load(Ordering::SeqCst) {
if let Some(user) = &state.user_id {
self.user_path_set.swap(true, Ordering::SeqCst);
self.path
.write()
.await
.push(format!("{}", user.localpart()))
}
}
let mut path = self.path.read().await.clone();
path.push("client.json");
if !Path::new(&path).exists() {
@ -101,7 +112,11 @@ impl StateStore for JsonStore {
}
async fn store_room_state(&self, room: &Room) -> Result<()> {
let mut path = self.path.clone();
if !self.user_path_set.load(Ordering::SeqCst) {
// TODO Error here, should the load methods also error?
}
let mut path = self.path.read().await.clone();
path.push(&format!("rooms/{}.json", room.room_id));
if !Path::new(&path).exists() {
@ -170,11 +185,22 @@ mod test {
async fn test_store_client_state() {
let path: &Path = &PATH;
let user = UserId::try_from("@example:example.com").unwrap();
let store = JsonStore::open(path).unwrap();
let state = ClientState::default();
store.store_client_state(state).await.unwrap();
let state = ClientState {
user_id: Some(user.clone()),
device_id: None,
sync_token: Some("hello".into()),
ignored_users: vec![user],
push_ruleset: None,
};
store.store_client_state(state.clone()).await.unwrap();
let loaded = store.load_client_state().await.unwrap();
assert_eq!(loaded, ClientState::default());
assert_eq!(loaded, Some(state));
}
#[tokio::test]
@ -191,8 +217,8 @@ mod test {
let room = Room::new(&id, &user);
store.store_room_state(&room).await.unwrap();
let loaded = store.load_room_state(&id).await.unwrap();
assert_eq!(loaded, Room::new(&id, &user));
let loaded = store.load_all_rooms().await.unwrap();
assert_eq!(loaded.get(&id), Some(&Room::new(&id, &user)));
}
#[tokio::test]