state_store: load_client takes a sessions obj, docs, copyright, move state store into base client

master
Devin R 2020-04-29 07:00:14 -04:00
parent 83f3fc6796
commit ef560fd545
5 changed files with 90 additions and 71 deletions

View File

@ -675,8 +675,8 @@ impl AsyncClient {
let decrypted_event = {
let mut client = self.base_client.write().await;
let mut timeline_update = false;
let decrypt_ev = client
.receive_joined_timeline_event(room_id, &mut event, &mut timeline_update)
let (decrypt_ev, timeline_update) = client
.receive_joined_timeline_event(room_id, &mut event)
.await;
if timeline_update {
updated = true;
@ -746,7 +746,7 @@ impl AsyncClient {
}
let mut client = self.base_client.write().await;
client.receive_sync_response(&mut response).await;
client.receive_sync_response(&mut response, updated).await?;
if updated {
if let Some(store) = client.state_store.as_ref() {

View File

@ -76,7 +76,10 @@ pub struct Client {
/// Any implementor of EventEmitter will act as the callbacks for various
/// events.
pub event_emitter: Option<Box<dyn EventEmitter>>,
/// Any implementor of `StateStore` will be called to save `Room` and
/// some `BaseClient` state during `AsyncClient::sync` calls.
///
/// There is a default implementation `JsonStore` that saves JSON to disk.
pub state_store: Option<Box<dyn StateStore>>,
/// Does the `Client` need to sync with the state store.
needs_state_store_sync: bool,
@ -148,41 +151,42 @@ impl Client {
/// Returns `true` when a sync has successfully completed.
pub(crate) async fn sync_with_state_store(&mut self) -> Result<bool> {
if let Some(store) = self.state_store.as_ref() {
if let Some(client_state) = store.load_client_state().await? {
let ClientState {
user_id,
device_id,
sync_token,
ignored_users,
push_ruleset,
} = client_state;
if let Some(sess) = self.session.as_ref() {
if let Some(client_state) = store.load_client_state(sess).await? {
let ClientState {
user_id,
device_id,
sync_token,
ignored_users,
push_ruleset,
} = client_state;
if let Some(sess) = self.session.as_mut() {
if let Some(device) = device_id {
sess.device_id = device;
}
if let Some(user) = user_id {
sess.user_id = user;
if let Some(sess) = self.session.as_mut() {
if let Some(device) = device_id {
sess.device_id = device;
}
if let Some(user) = user_id {
sess.user_id = user;
}
}
self.sync_token = sync_token;
self.ignored_users = ignored_users;
self.push_ruleset = push_ruleset;
} else {
// return false and continues with a sync request then save the state and create
// and populate the files during the sync
return Ok(false);
}
self.sync_token = sync_token;
self.ignored_users = ignored_users;
self.push_ruleset = push_ruleset;
} else {
// return false and continues with a sync request then save the state and create
// and populate the files during the sync
return Ok(false);
let mut rooms = store.load_all_rooms().await?;
self.joined_rooms = rooms
.drain()
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
.collect();
self.needs_state_store_sync = false;
}
let mut rooms = store.load_all_rooms().await?;
self.joined_rooms = rooms
.drain()
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
.collect();
self.needs_state_store_sync = false;
}
Ok(!self.needs_state_store_sync)
}
@ -278,23 +282,19 @@ impl Client {
/// Receive a timeline event for a joined room and update the client state.
///
/// If the event was a encrypted room event and decryption was successful
/// the decrypted event will be returned, otherwise None.
/// Returns a tuple of the successfully decrypted event, or None on failure and
/// a bool, true when the `Room` state has been updated.
///
/// # Arguments
///
/// * `room_id` - The unique id of the room the event belongs to.
///
/// * `event` - The event that should be handled by the client.
///
/// * `did_update` - This is used internally to confirm when the state has
/// been updated.
pub async fn receive_joined_timeline_event(
&mut self,
room_id: &RoomId,
event: &mut EventJson<RoomEvent>,
did_update: &mut bool,
) -> Option<EventJson<RoomEvent>> {
) -> (Option<EventJson<RoomEvent>>, bool) {
match event.deserialize() {
#[allow(unused_mut)]
Ok(mut e) => {
@ -319,11 +319,9 @@ impl Client {
}
let mut room = self.get_or_create_room(&room_id).write().await;
// TODO is passing in the bool to use in `AsyncClient::sync` ok here
*did_update = room.receive_timeline_event(&e);
decrypted_event
(decrypted_event, room.receive_timeline_event(&e))
}
_ => None,
_ => (None, false),
}
}
@ -419,7 +417,13 @@ impl Client {
/// # Arguments
///
/// * `response` - The response that we received after a successful sync.
pub async fn receive_sync_response(&mut self, response: &mut api::sync::sync_events::Response) {
///
/// * `did_update` - Signals to the `StateStore` if the client state needs updating.
pub async fn receive_sync_response(
&mut self,
response: &mut api::sync::sync_events::Response,
did_update: bool,
) -> Result<()> {
self.sync_token = Some(response.next_batch.clone());
#[cfg(feature = "encryption")]
@ -442,6 +446,14 @@ impl Client {
}
}
}
if did_update {
if let Some(store) = self.state_store.as_ref() {
let state = ClientState::from_base_client(&self);
store.store_client_state(state).await?;
}
}
Ok(())
}
/// Should account or one-time keys be uploaded to the server.

View File

@ -1,4 +1,4 @@
// Copyright 2020 Damir Jelić
// Copyright 2020 Devin Ragotzy
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -23,8 +23,7 @@ use serde::{Deserialize, Serialize};
use crate::base_client::{Client as BaseClient, Token};
use crate::events::push_rules::Ruleset;
use crate::identifiers::{DeviceId, RoomId, UserId};
use crate::models::Room;
use crate::Result;
use crate::{Result, Room, Session};
/// `ClientState` holds all the information to restore a `BaseClient`
/// except the `access_token` as the default store is not secure.
@ -72,7 +71,7 @@ pub trait StateStore: Send + Sync {
///
/// An `Option::None` should be returned only if the `StateStore` tries to
/// load but no state has been stored.
async fn load_client_state(&self) -> Result<Option<ClientState>>;
async fn load_client_state(&self, _: &Session) -> Result<Option<ClientState>>;
/// Load the state of all `Room`s.
///
/// This will be mapped over in the client in order to store `Room`s in an async safe way.

View File

@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::{BufReader, BufWriter, Write};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicBool, Ordering},
@ -8,16 +7,17 @@ use std::sync::{
};
use tokio::fs as async_fs;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
use super::{ClientState, StateStore};
use crate::identifiers::RoomId;
use crate::{Error, Result, Room};
use crate::{Error, Result, Room, Session};
/// A default `StateStore` implementation that serializes state as json
/// and saves it to disk.
///
/// When logged in the `JsonStore` appends the user_id to it's folder path,
/// so all files are saved in `my_client/user_id/*`.
/// so all files are saved in `my_client/user_id_localpart/*`.
pub struct JsonStore {
path: Arc<RwLock<PathBuf>>,
user_path_set: AtomicBool,
@ -41,7 +41,12 @@ impl JsonStore {
#[async_trait::async_trait]
impl StateStore for JsonStore {
async fn load_client_state(&self) -> Result<Option<ClientState>> {
async fn load_client_state(&self, sess: &Session) -> Result<Option<ClientState>> {
if !self.user_path_set.load(Ordering::SeqCst) {
self.user_path_set.swap(true, Ordering::SeqCst);
self.path.write().await.push(sess.user_id.localpart())
}
let mut path = self.path.read().await.clone();
path.push("client.json");
@ -67,10 +72,9 @@ impl StateStore for JsonStore {
continue;
}
let f_hdl = OpenOptions::new().read(true).open(&file)?;
let reader = BufReader::new(f_hdl);
let json = async_fs::read_to_string(&file).await?;
let room = serde_json::from_reader::<_, Room>(reader).map_err(Error::from)?;
let room = serde_json::from_str::<Room>(&json).map_err(Error::from)?;
let room_id = room.room_id.clone();
rooms_map.insert(room_id, room);
@ -97,15 +101,13 @@ impl StateStore for JsonStore {
let json = serde_json::to_string(&state).map_err(Error::from)?;
let file = OpenOptions::new()
let mut file = async_fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
let mut writer = BufWriter::new(file);
writer.write_all(json.as_bytes())?;
Ok(())
.open(path)
.await?;
file.write_all(json.as_bytes()).await.map_err(Error::from)
}
async fn store_room_state(&self, room: &Room) -> Result<()> {
@ -125,15 +127,13 @@ impl StateStore for JsonStore {
let json = serde_json::to_string(&room).map_err(Error::from)?;
let file = OpenOptions::new()
let mut file = async_fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)?;
let mut writer = BufWriter::new(file);
writer.write_all(json.as_bytes())?;
Ok(())
.open(path)
.await?;
file.write_all(json.as_bytes()).await.map_err(Error::from)
}
}
@ -187,6 +187,12 @@ mod test {
let user = UserId::try_from("@example:example.com").unwrap();
let sess = Session {
access_token: "32nj9zu034btz90".to_string(),
user_id: user.clone(),
device_id: "Tester".to_string(),
};
let store = JsonStore::open(path).unwrap();
let state = ClientState {
@ -198,7 +204,9 @@ mod test {
};
store.store_client_state(state.clone()).await.unwrap();
let loaded = store.load_client_state().await.unwrap();
let store = JsonStore::open(path).unwrap();
let loaded = store.load_client_state(&sess).await.unwrap();
assert_eq!(loaded, Some(state));
}

View File

@ -367,7 +367,7 @@ impl ClientTestRunner {
}
for event in &self.room_events {
cli.receive_joined_timeline_event(room_id, &mut EventJson::from(event), &mut false)
cli.receive_joined_timeline_event(room_id, &mut EventJson::from(event))
.await;
}
for event in &self.presence_events {