state_store: fix docs, test client with default state store
parent
ad7a18d50a
commit
5491838228
13
design.md
13
design.md
|
@ -12,14 +12,14 @@ The highest level structure that ties the other pieces of functionality together
|
||||||
|
|
||||||
#### Base Client/Client State Machine
|
#### Base Client/Client State Machine
|
||||||
In addition to Http, the `AsyncClient` passes along methods from the `BaseClient` that deal with `Room`s and `RoomMember`s. This allows the client to keep track of more complicated information that needs to be calculated in some way.
|
In addition to Http, the `AsyncClient` passes along methods from the `BaseClient` that deal with `Room`s and `RoomMember`s. This allows the client to keep track of more complicated information that needs to be calculated in some way.
|
||||||
- human readable room names
|
- human-readable room names
|
||||||
- power level?
|
- power level?
|
||||||
- ignored list?
|
- ignored list?
|
||||||
- push rulesset?
|
- push rulesset?
|
||||||
- more?
|
- more?
|
||||||
|
|
||||||
#### Crypto State Machine
|
#### Crypto State Machine
|
||||||
Given a Matrix response the crypto machine will update it's internal state, along with encryption information this means keeping track of when to encrypt. It has knowledge of when encryption needs to happen and can be asked from the `BaseClient`. The crypto state machine is given responses that relate to encryption and can create encrypted request bodies for encryption related requests. Basically it tells the `BaseClient` to send a to-device messages out and the `BaseClient` is responsible for notifying the crypto state machine when it sent the message so crypto can update state.
|
Given a Matrix response the crypto machine will update it's internal state, along with encryption information this means keeping track of when to encrypt. It knows when encryption needs to happen based on signals from the `BaseClient`. The crypto state machine is given responses that relate to encryption and can create encrypted request bodies for encryption-related requests. Basically it tells the `BaseClient` to send to-device messages out, and the `BaseClient` is responsible for notifying the crypto state machine when it sent the message so crypto can update state.
|
||||||
|
|
||||||
#### Client State/Room and RoomMember
|
#### Client State/Room and RoomMember
|
||||||
The `BaseClient` is responsible for keeping state in sync through the `IncomingResponse`s of `AsyncClient` or querying the `StateStore`. By processing and then delegating incoming `RoomEvent`s, `StateEvent`s, `PresenceEvent`, `IncomingAccountData` and `EphemeralEvent`s to the correct `Room` in the base clients `HashMap<RoomId, Room>` or further to `Room`'s `RoomMember` via the members `HashMap<UserId, RoomMember>`. The `BaseClient` is also responsible for emitting the incoming events to the `EventEmitter` trait.
|
The `BaseClient` is responsible for keeping state in sync through the `IncomingResponse`s of `AsyncClient` or querying the `StateStore`. By processing and then delegating incoming `RoomEvent`s, `StateEvent`s, `PresenceEvent`, `IncomingAccountData` and `EphemeralEvent`s to the correct `Room` in the base clients `HashMap<RoomId, Room>` or further to `Room`'s `RoomMember` via the members `HashMap<UserId, RoomMember>`. The `BaseClient` is also responsible for emitting the incoming events to the `EventEmitter` trait.
|
||||||
|
@ -87,12 +87,13 @@ pub struct RoomMember {
|
||||||
```
|
```
|
||||||
|
|
||||||
#### State Store
|
#### State Store
|
||||||
The `BaseClient` also has access to a `dyn StateStore` this is an abstraction around a "database" to keep the client state without requesting a full sync from the server on startup. A default implementation that serializes/deserializes JSON to files in a specified directory can be used. The user can also implement `StateStore` to fit any storage solution they choose.
|
The `BaseClient` also has access to a `dyn StateStore` this is an abstraction around a "database" to keep the client state without requesting a full sync from the server on startup. A default implementation that serializes/deserializes JSON to files in a specified directory can be used. The user can also implement `StateStore` to fit any storage solution they choose. The base client handles the storage automatically. There "may be/are TODO" ways for the user to interact directly. The room event handling methods signal if the state was modified; if so, we check if some room state file needs to be overwritten.
|
||||||
- load
|
- open
|
||||||
- store/save
|
- load client/room or rooms
|
||||||
|
- store client/room
|
||||||
- update ??
|
- update ??
|
||||||
|
|
||||||
The state store will restore our client state in the `BaseClient` and client authors can just get the latest state that they want to present from the client object. No need to ask the state store for it, this may change if custom setups request this. `StateStore`'s base is load/store and internally to the crate update the `BaseClient`.
|
The state store will restore our client state in the `BaseClient` and client authors can just get the latest state that they want to present from the client object. No need to ask the state store for it, this may change if custom setups request this. `StateStore`'s main purpose is to provide load/store functionality and, internally to the crate, update the `BaseClient`.
|
||||||
|
|
||||||
#### Event Emitter
|
#### Event Emitter
|
||||||
The consumer of this crate can implement the `EventEmitter` trait for full control over how incoming events are handled by their client. If that isn't enough it is possible to receive every incoming response with the `AsyncClient::sync_forever` callback.
|
The consumer of this crate can implement the `EventEmitter` trait for full control over how incoming events are handled by their client. If that isn't enough it is possible to receive every incoming response with the `AsyncClient::sync_forever` callback.
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
use std::result::Result as StdResult;
|
use std::result::Result as StdResult;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
@ -46,6 +48,7 @@ use crate::api;
|
||||||
use crate::base_client::Client as BaseClient;
|
use crate::base_client::Client as BaseClient;
|
||||||
use crate::models::Room;
|
use crate::models::Room;
|
||||||
use crate::session::Session;
|
use crate::session::Session;
|
||||||
|
use crate::state::{ClientState, JsonStore, StateStore};
|
||||||
use crate::VERSION;
|
use crate::VERSION;
|
||||||
use crate::{Error, EventEmitter, Result};
|
use crate::{Error, EventEmitter, Result};
|
||||||
|
|
||||||
|
@ -62,6 +65,8 @@ pub struct AsyncClient {
|
||||||
http_client: reqwest::Client,
|
http_client: reqwest::Client,
|
||||||
/// User session data.
|
/// User session data.
|
||||||
pub(crate) base_client: Arc<RwLock<BaseClient>>,
|
pub(crate) base_client: Arc<RwLock<BaseClient>>,
|
||||||
|
/// The path to the default state store.
|
||||||
|
state_store_path: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for AsyncClient {
|
impl std::fmt::Debug for AsyncClient {
|
||||||
|
@ -70,7 +75,7 @@ impl std::fmt::Debug for AsyncClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default)]
|
||||||
/// Configuration for the creation of the `AsyncClient`.
|
/// Configuration for the creation of the `AsyncClient`.
|
||||||
///
|
///
|
||||||
/// # Example
|
/// # Example
|
||||||
|
@ -88,6 +93,19 @@ pub struct AsyncClientConfig {
|
||||||
proxy: Option<reqwest::Proxy>,
|
proxy: Option<reqwest::Proxy>,
|
||||||
user_agent: Option<HeaderValue>,
|
user_agent: Option<HeaderValue>,
|
||||||
disable_ssl_verification: bool,
|
disable_ssl_verification: bool,
|
||||||
|
store_path: Option<PathBuf>,
|
||||||
|
state_store: Option<Box<dyn StateStore>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for AsyncClientConfig {
|
||||||
|
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> StdResult<(), std::fmt::Error> {
|
||||||
|
fmt.debug_struct("AsyncClientConfig")
|
||||||
|
.field("proxy", &self.proxy)
|
||||||
|
.field("user_agent", &self.user_agent)
|
||||||
|
.field("disable_ssl_verification", &self.disable_ssl_verification)
|
||||||
|
.field("store_path", &self.store_path)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncClientConfig {
|
impl AsyncClientConfig {
|
||||||
|
@ -129,6 +147,23 @@ impl AsyncClientConfig {
|
||||||
self.user_agent = Some(HeaderValue::from_str(user_agent)?);
|
self.user_agent = Some(HeaderValue::from_str(user_agent)?);
|
||||||
Ok(self)
|
Ok(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the path for the default `StateStore`.
|
||||||
|
///
|
||||||
|
/// When the path is set `AsyncClient` will set the state store
|
||||||
|
/// to `JsonStore`.
|
||||||
|
pub fn state_store_path<P: AsRef<Path>>(mut self, path: P) -> Self {
|
||||||
|
self.store_path = Some(path.as_ref().to_owned());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set a custom implementation of a `StateStore`.
|
||||||
|
///
|
||||||
|
/// The state store should be "connected" before being set.
|
||||||
|
pub fn state_store(mut self, store: Box<dyn StateStore>) -> Self {
|
||||||
|
self.state_store = Some(store);
|
||||||
|
self
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
#[derive(Debug, Default, Clone)]
|
||||||
|
@ -253,10 +288,20 @@ impl AsyncClient {
|
||||||
|
|
||||||
let http_client = http_client.default_headers(headers).build()?;
|
let http_client = http_client.default_headers(headers).build()?;
|
||||||
|
|
||||||
|
let mut base_client = BaseClient::new(session)?;
|
||||||
|
if let Some(path) = config.store_path.as_ref() {
|
||||||
|
let store = JsonStore;
|
||||||
|
store.open(path)?;
|
||||||
|
base_client.state_store = Some(Box::new(store));
|
||||||
|
} else if let Some(store) = config.state_store {
|
||||||
|
base_client.state_store = Some(store);
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
homeserver,
|
homeserver,
|
||||||
http_client,
|
http_client,
|
||||||
base_client: Arc::new(RwLock::new(BaseClient::new(session)?)),
|
base_client: Arc::new(RwLock::new(base_client)),
|
||||||
|
state_store_path: config.store_path,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,7 +382,15 @@ impl AsyncClient {
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request).await?;
|
||||||
let mut client = self.base_client.write().await;
|
let mut client = self.base_client.write().await;
|
||||||
client.receive_login_response(&response).await?;
|
// TODO avoid allocation somehow?
|
||||||
|
let path = self.state_store_path.as_ref().map(|p| {
|
||||||
|
let mut path = PathBuf::from(p);
|
||||||
|
path.push(response.user_id.to_string());
|
||||||
|
path
|
||||||
|
});
|
||||||
|
client
|
||||||
|
.receive_login_response(&response, path.as_ref())
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
@ -485,7 +538,7 @@ impl AsyncClient {
|
||||||
/// .name("name")
|
/// .name("name")
|
||||||
/// .room_version("v1.0");
|
/// .room_version("v1.0");
|
||||||
///
|
///
|
||||||
/// let mut cli = AsyncClient::<(), ()>::new(homeserver, None).unwrap();
|
/// let mut cli = AsyncClient::new(homeserver, None).unwrap();
|
||||||
/// # use futures::executor::block_on;
|
/// # use futures::executor::block_on;
|
||||||
/// # block_on(async {
|
/// # block_on(async {
|
||||||
/// assert!(cli.create_room(builder).await.is_ok());
|
/// assert!(cli.create_room(builder).await.is_ok());
|
||||||
|
@ -529,7 +582,7 @@ impl AsyncClient {
|
||||||
/// .direction(Direction::Backward)
|
/// .direction(Direction::Backward)
|
||||||
/// .limit(UInt::new(10).unwrap());
|
/// .limit(UInt::new(10).unwrap());
|
||||||
///
|
///
|
||||||
/// let mut cli = AsyncClient::<(), ()>::new(homeserver, None).unwrap();
|
/// let mut cli = AsyncClient::new(homeserver, None).unwrap();
|
||||||
/// # use futures::executor::block_on;
|
/// # use futures::executor::block_on;
|
||||||
/// # block_on(async {
|
/// # block_on(async {
|
||||||
/// assert!(cli.room_messages(builder).await.is_ok());
|
/// assert!(cli.room_messages(builder).await.is_ok());
|
||||||
|
@ -560,12 +613,15 @@ impl AsyncClient {
|
||||||
|
|
||||||
let mut response = self.send(request).await?;
|
let mut response = self.send(request).await?;
|
||||||
|
|
||||||
|
let mut updated = false;
|
||||||
for (room_id, room) in &mut response.rooms.join {
|
for (room_id, room) in &mut response.rooms.join {
|
||||||
let matrix_room = {
|
let matrix_room = {
|
||||||
let mut client = self.base_client.write().await;
|
let mut client = self.base_client.write().await;
|
||||||
for event in &room.state.events {
|
for event in &room.state.events {
|
||||||
if let EventResult::Ok(e) = event {
|
if let EventResult::Ok(e) = event {
|
||||||
client.receive_joined_state_event(&room_id, &e).await;
|
if client.receive_joined_state_event(&room_id, &e).await {
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -586,9 +642,14 @@ impl AsyncClient {
|
||||||
for mut event in &mut room.timeline.events {
|
for mut event in &mut room.timeline.events {
|
||||||
let decrypted_event = {
|
let decrypted_event = {
|
||||||
let mut client = self.base_client.write().await;
|
let mut client = self.base_client.write().await;
|
||||||
client
|
let mut timeline_update = false;
|
||||||
.receive_joined_timeline_event(room_id, &mut event)
|
let decrypt_ev = client
|
||||||
.await
|
.receive_joined_timeline_event(room_id, &mut event, &mut timeline_update)
|
||||||
|
.await;
|
||||||
|
if timeline_update {
|
||||||
|
updated = true;
|
||||||
|
};
|
||||||
|
decrypt_ev
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(e) = decrypted_event {
|
if let Some(e) = decrypted_event {
|
||||||
|
@ -606,7 +667,9 @@ impl AsyncClient {
|
||||||
{
|
{
|
||||||
if let EventResult::Ok(e) = account_data {
|
if let EventResult::Ok(e) = account_data {
|
||||||
let mut client = self.base_client.write().await;
|
let mut client = self.base_client.write().await;
|
||||||
client.receive_account_data_event(&room_id, e).await;
|
if client.receive_account_data_event(&room_id, e).await {
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
client.emit_account_data_event(room_id, e).await;
|
client.emit_account_data_event(room_id, e).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -619,7 +682,9 @@ impl AsyncClient {
|
||||||
{
|
{
|
||||||
if let EventResult::Ok(e) = presence {
|
if let EventResult::Ok(e) = presence {
|
||||||
let mut client = self.base_client.write().await;
|
let mut client = self.base_client.write().await;
|
||||||
client.receive_presence_event(&room_id, e).await;
|
if client.receive_presence_event(&room_id, e).await {
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
|
||||||
client.emit_presence_event(room_id, e).await;
|
client.emit_presence_event(room_id, e).await;
|
||||||
}
|
}
|
||||||
|
@ -630,17 +695,37 @@ impl AsyncClient {
|
||||||
{
|
{
|
||||||
if let EventResult::Ok(e) = ephemeral {
|
if let EventResult::Ok(e) = ephemeral {
|
||||||
let mut client = self.base_client.write().await;
|
let mut client = self.base_client.write().await;
|
||||||
client.receive_ephemeral_event(&room_id, e).await;
|
if client.receive_ephemeral_event(&room_id, e).await {
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
|
||||||
client.emit_ephemeral_event(room_id, e).await;
|
client.emit_ephemeral_event(room_id, e).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if updated {
|
||||||
|
if let Some(store) = self.base_client.read().await.state_store.as_ref() {
|
||||||
|
if let Some(path) = self.state_store_path.as_ref() {
|
||||||
|
store
|
||||||
|
.store_room_state(&path, matrix_room.read().await.deref())
|
||||||
|
.await?;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut client = self.base_client.write().await;
|
let mut client = self.base_client.write().await;
|
||||||
client.receive_sync_response(&mut response).await;
|
client.receive_sync_response(&mut response).await;
|
||||||
|
|
||||||
|
if updated {
|
||||||
|
if let Some(store) = client.state_store.as_ref() {
|
||||||
|
if let Some(path) = self.state_store_path.as_ref() {
|
||||||
|
let state = ClientState::from_base_client(&client);
|
||||||
|
store.store_client_state(&path, state).await?;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -673,7 +758,7 @@ impl AsyncClient {
|
||||||
/// # use futures::executor::block_on;
|
/// # use futures::executor::block_on;
|
||||||
/// # block_on(async {
|
/// # block_on(async {
|
||||||
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
|
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
|
||||||
/// # let mut client = AsyncClient::<(), ()>::new(homeserver, None).unwrap();
|
/// # let mut client = AsyncClient::new(homeserver, None).unwrap();
|
||||||
///
|
///
|
||||||
/// use async_std::sync::channel;
|
/// use async_std::sync::channel;
|
||||||
///
|
///
|
||||||
|
@ -862,7 +947,7 @@ impl AsyncClient {
|
||||||
/// use matrix_sdk::events::room::message::{MessageEventContent, TextMessageEventContent};
|
/// use matrix_sdk::events::room::message::{MessageEventContent, TextMessageEventContent};
|
||||||
/// # block_on(async {
|
/// # block_on(async {
|
||||||
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
|
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
|
||||||
/// # let mut client = AsyncClient::<(), ()>::new(homeserver, None).unwrap();
|
/// # let mut client = AsyncClient::new(homeserver, None).unwrap();
|
||||||
/// # let room_id = RoomId::try_from("!test:localhost").unwrap();
|
/// # let room_id = RoomId::try_from("!test:localhost").unwrap();
|
||||||
/// use uuid::Uuid;
|
/// use uuid::Uuid;
|
||||||
///
|
///
|
||||||
|
|
|
@ -19,6 +19,7 @@ use std::collections::HashSet;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
use std::result::Result as StdResult;
|
use std::result::Result as StdResult;
|
||||||
|
|
||||||
|
@ -34,7 +35,7 @@ use crate::events::EventResult;
|
||||||
use crate::identifiers::{RoomId, UserId};
|
use crate::identifiers::{RoomId, UserId};
|
||||||
use crate::models::Room;
|
use crate::models::Room;
|
||||||
use crate::session::Session;
|
use crate::session::Session;
|
||||||
use crate::state::StateStore;
|
use crate::state::{ClientState, StateStore};
|
||||||
use crate::EventEmitter;
|
use crate::EventEmitter;
|
||||||
|
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
|
@ -144,6 +145,7 @@ impl Client {
|
||||||
pub async fn receive_login_response(
|
pub async fn receive_login_response(
|
||||||
&mut self,
|
&mut self,
|
||||||
response: &api::session::login::Response,
|
response: &api::session::login::Response,
|
||||||
|
store_path: Option<&PathBuf>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let session = Session {
|
let session = Session {
|
||||||
access_token: response.access_token.clone(),
|
access_token: response.access_token.clone(),
|
||||||
|
@ -158,6 +160,27 @@ impl Client {
|
||||||
*olm = Some(OlmMachine::new(&response.user_id, &response.device_id)?);
|
*olm = Some(OlmMachine::new(&response.user_id, &response.device_id)?);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(path) = store_path {
|
||||||
|
if let Some(store) = self.state_store.as_ref() {
|
||||||
|
let ClientState {
|
||||||
|
session,
|
||||||
|
sync_token,
|
||||||
|
ignored_users,
|
||||||
|
push_ruleset,
|
||||||
|
} = store.load_client_state(&path).await?;
|
||||||
|
let mut rooms = store.load_all_rooms(&path).await?;
|
||||||
|
|
||||||
|
self.joined_rooms = rooms
|
||||||
|
.drain()
|
||||||
|
.map(|(k, room)| (k, Arc::new(RwLock::new(room))))
|
||||||
|
.collect();
|
||||||
|
self.session = session;
|
||||||
|
self.sync_token = sync_token;
|
||||||
|
self.ignored_users = ignored_users;
|
||||||
|
self.push_ruleset = push_ruleset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,10 +258,14 @@ impl Client {
|
||||||
/// * `room_id` - The unique id of the room the event belongs to.
|
/// * `room_id` - The unique id of the room the event belongs to.
|
||||||
///
|
///
|
||||||
/// * `event` - The event that should be handled by the client.
|
/// * `event` - The event that should be handled by the client.
|
||||||
|
///
|
||||||
|
/// * `did_update` - This is used internally to confirm when the state has
|
||||||
|
/// been updated.
|
||||||
pub async fn receive_joined_timeline_event(
|
pub async fn receive_joined_timeline_event(
|
||||||
&mut self,
|
&mut self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
event: &mut EventResult<RoomEvent>,
|
event: &mut EventResult<RoomEvent>,
|
||||||
|
did_update: &mut bool,
|
||||||
) -> Option<EventResult<RoomEvent>> {
|
) -> Option<EventResult<RoomEvent>> {
|
||||||
match event {
|
match event {
|
||||||
EventResult::Ok(e) => {
|
EventResult::Ok(e) => {
|
||||||
|
@ -263,7 +290,8 @@ impl Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut room = self.get_or_create_room(&room_id).write().await;
|
let mut room = self.get_or_create_room(&room_id).write().await;
|
||||||
room.receive_timeline_event(e);
|
// Not sure what the best way to do this is ??
|
||||||
|
*did_update = room.receive_timeline_event(e);
|
||||||
decrypted_event
|
decrypted_event
|
||||||
}
|
}
|
||||||
_ => None,
|
_ => None,
|
||||||
|
|
|
@ -29,7 +29,7 @@ use js_int::UInt;
|
||||||
/// .visibility(Visibility::Public)
|
/// .visibility(Visibility::Public)
|
||||||
/// .name("name")
|
/// .name("name")
|
||||||
/// .room_version("v1.0");
|
/// .room_version("v1.0");
|
||||||
/// let mut cli = AsyncClient::<(), ()>::new(homeserver, None).unwrap();
|
/// let mut cli = AsyncClient::new(homeserver, None).unwrap();
|
||||||
/// cli.create_room(builder).await;
|
/// cli.create_room(builder).await;
|
||||||
/// # })
|
/// # })
|
||||||
/// ```
|
/// ```
|
||||||
|
@ -186,7 +186,7 @@ impl Into<create_room::Request> for RoomBuilder {
|
||||||
/// # rt.block_on(async {
|
/// # rt.block_on(async {
|
||||||
/// # let room_id = RoomId::new(homeserver.as_str()).unwrap();
|
/// # let room_id = RoomId::new(homeserver.as_str()).unwrap();
|
||||||
/// # let last_sync_token = "".to_string();;
|
/// # let last_sync_token = "".to_string();;
|
||||||
/// let mut cli = AsyncClient::<(), ()>::new(homeserver, None).unwrap();
|
/// let mut cli = AsyncClient::new(homeserver, None).unwrap();
|
||||||
///
|
///
|
||||||
/// let mut builder = MessagesRequestBuilder::new();
|
/// let mut builder = MessagesRequestBuilder::new();
|
||||||
/// builder.room_id(room_id)
|
/// builder.room_id(room_id)
|
||||||
|
|
|
@ -21,7 +21,7 @@ pub use state_store::JsonStore;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::base_client::Token;
|
use crate::base_client::{Client as BaseClient, Token};
|
||||||
use crate::events::push_rules::Ruleset;
|
use crate::events::push_rules::Ruleset;
|
||||||
use crate::identifiers::{RoomId, UserId};
|
use crate::identifiers::{RoomId, UserId};
|
||||||
use crate::models::Room;
|
use crate::models::Room;
|
||||||
|
@ -40,22 +40,41 @@ pub struct ClientState {
|
||||||
pub push_ruleset: Option<Ruleset>,
|
pub push_ruleset: Option<Ruleset>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ClientState {
|
||||||
|
pub fn from_base_client(client: &BaseClient) -> ClientState {
|
||||||
|
let BaseClient {
|
||||||
|
session,
|
||||||
|
sync_token,
|
||||||
|
ignored_users,
|
||||||
|
push_ruleset,
|
||||||
|
..
|
||||||
|
} = client;
|
||||||
|
Self {
|
||||||
|
session: session.clone(),
|
||||||
|
sync_token: sync_token.clone(),
|
||||||
|
ignored_users: ignored_users.clone(),
|
||||||
|
push_ruleset: push_ruleset.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Abstraction around the data store to avoid unnecessary request on client initialization.
|
/// Abstraction around the data store to avoid unnecessary request on client initialization.
|
||||||
|
#[async_trait::async_trait]
|
||||||
pub trait StateStore: Send + Sync {
|
pub trait StateStore: Send + Sync {
|
||||||
/// Set up connections or open files to load/save state.
|
/// Set up connections or check files exist to load/save state.
|
||||||
fn open(&self, path: &Path) -> Result<()>;
|
fn open(&self, path: &Path) -> Result<()>;
|
||||||
/// Loads the state of `BaseClient` through `StateStore::Store` type.
|
/// Loads the state of `BaseClient` through `StateStore::Store` type.
|
||||||
fn load_client_state(&self, path: &Path) -> Result<ClientState>;
|
async fn load_client_state(&self, path: &Path) -> Result<ClientState>;
|
||||||
/// Load the state of a single `Room` by `RoomId`.
|
/// Load the state of a single `Room` by `RoomId`.
|
||||||
fn load_room_state(&self, path: &Path, room_id: &RoomId) -> Result<Room>;
|
async fn load_room_state(&self, path: &Path, room_id: &RoomId) -> Result<Room>;
|
||||||
/// Load the state of all `Room`s.
|
/// Load the state of all `Room`s.
|
||||||
///
|
///
|
||||||
/// This will be mapped over in the client in order to store `Room`s in an async safe way.
|
/// This will be mapped over in the client in order to store `Room`s in an async safe way.
|
||||||
fn load_all_rooms(&self, path: &Path) -> Result<HashMap<RoomId, Room>>;
|
async fn load_all_rooms(&self, path: &Path) -> Result<HashMap<RoomId, Room>>;
|
||||||
/// Save the current state of the `BaseClient` using the `StateStore::Store` type.
|
/// Save the current state of the `BaseClient` using the `StateStore::Store` type.
|
||||||
fn store_client_state(&self, path: &Path, _: ClientState) -> Result<()>;
|
async fn store_client_state(&self, path: &Path, _: ClientState) -> Result<()>;
|
||||||
/// Save the state a single `Room`.
|
/// Save the state a single `Room`.
|
||||||
fn store_room_state(&self, path: &Path, _: &Room) -> Result<()>;
|
async fn store_room_state(&self, path: &Path, _: &Room) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs::{self, OpenOptions};
|
use std::fs::{self, OpenOptions};
|
||||||
use std::io::{BufReader, BufWriter, Write};
|
use std::io::{BufReader, BufWriter, Write};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::Path;
|
||||||
|
|
||||||
use super::{ClientState, StateStore};
|
use super::{ClientState, StateStore};
|
||||||
use crate::identifiers::RoomId;
|
use crate::identifiers::RoomId;
|
||||||
|
@ -10,6 +10,7 @@ use crate::{Error, Result, Room};
|
||||||
/// and saves it to disk.
|
/// and saves it to disk.
|
||||||
pub struct JsonStore;
|
pub struct JsonStore;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
impl StateStore for JsonStore {
|
impl StateStore for JsonStore {
|
||||||
fn open(&self, path: &Path) -> Result<()> {
|
fn open(&self, path: &Path) -> Result<()> {
|
||||||
if !path.exists() {
|
if !path.exists() {
|
||||||
|
@ -17,7 +18,8 @@ impl StateStore for JsonStore {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn load_client_state(&self, path: &Path) -> Result<ClientState> {
|
|
||||||
|
async fn load_client_state(&self, path: &Path) -> Result<ClientState> {
|
||||||
let mut path = path.to_path_buf();
|
let mut path = path.to_path_buf();
|
||||||
path.push("client.json");
|
path.push("client.json");
|
||||||
|
|
||||||
|
@ -26,7 +28,7 @@ impl StateStore for JsonStore {
|
||||||
serde_json::from_reader(reader).map_err(Error::from)
|
serde_json::from_reader(reader).map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_room_state(&self, path: &Path, room_id: &RoomId) -> Result<Room> {
|
async fn load_room_state(&self, path: &Path, room_id: &RoomId) -> Result<Room> {
|
||||||
let mut path = path.to_path_buf();
|
let mut path = path.to_path_buf();
|
||||||
path.push(&format!("rooms/{}.json", room_id));
|
path.push(&format!("rooms/{}.json", room_id));
|
||||||
|
|
||||||
|
@ -35,7 +37,7 @@ impl StateStore for JsonStore {
|
||||||
serde_json::from_reader(reader).map_err(Error::from)
|
serde_json::from_reader(reader).map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_all_rooms(&self, path: &Path) -> Result<HashMap<RoomId, Room>> {
|
async fn load_all_rooms(&self, path: &Path) -> Result<HashMap<RoomId, Room>> {
|
||||||
let mut path = path.to_path_buf();
|
let mut path = path.to_path_buf();
|
||||||
path.push("rooms");
|
path.push("rooms");
|
||||||
|
|
||||||
|
@ -59,7 +61,7 @@ impl StateStore for JsonStore {
|
||||||
Ok(rooms_map)
|
Ok(rooms_map)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn store_client_state(&self, path: &Path, state: ClientState) -> Result<()> {
|
async fn store_client_state(&self, path: &Path, state: ClientState) -> Result<()> {
|
||||||
let mut path = path.to_path_buf();
|
let mut path = path.to_path_buf();
|
||||||
path.push("client.json");
|
path.push("client.json");
|
||||||
|
|
||||||
|
@ -71,14 +73,18 @@ impl StateStore for JsonStore {
|
||||||
|
|
||||||
let json = serde_json::to_string(&state).map_err(Error::from)?;
|
let json = serde_json::to_string(&state).map_err(Error::from)?;
|
||||||
|
|
||||||
let file = OpenOptions::new().write(true).create(true).open(path)?;
|
let file = OpenOptions::new()
|
||||||
|
.write(true)
|
||||||
|
.create(true)
|
||||||
|
.truncate(true)
|
||||||
|
.open(path)?;
|
||||||
let mut writer = BufWriter::new(file);
|
let mut writer = BufWriter::new(file);
|
||||||
writer.write_all(json.as_bytes())?;
|
writer.write_all(json.as_bytes())?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn store_room_state(&self, path: &Path, room: &Room) -> Result<()> {
|
async fn store_room_state(&self, path: &Path, room: &Room) -> Result<()> {
|
||||||
let mut path = path.to_path_buf();
|
let mut path = path.to_path_buf();
|
||||||
path.push(&format!("rooms/{}.json", room.room_id));
|
path.push(&format!("rooms/{}.json", room.room_id));
|
||||||
|
|
||||||
|
@ -90,7 +96,11 @@ impl StateStore for JsonStore {
|
||||||
|
|
||||||
let json = serde_json::to_string(&room).map_err(Error::from)?;
|
let json = serde_json::to_string(&room).map_err(Error::from)?;
|
||||||
|
|
||||||
let file = OpenOptions::new().write(true).create(true).open(path)?;
|
let file = OpenOptions::new()
|
||||||
|
.write(true)
|
||||||
|
.create(true)
|
||||||
|
.truncate(true)
|
||||||
|
.open(path)?;
|
||||||
let mut writer = BufWriter::new(file);
|
let mut writer = BufWriter::new(file);
|
||||||
writer.write_all(json.as_bytes())?;
|
writer.write_all(json.as_bytes())?;
|
||||||
|
|
||||||
|
@ -104,11 +114,15 @@ mod test {
|
||||||
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
use mockito::{mock, Matcher};
|
||||||
|
|
||||||
use crate::identifiers::{RoomId, UserId};
|
use crate::identifiers::{RoomId, UserId};
|
||||||
|
use crate::{AsyncClient, AsyncClientConfig, Session, SyncSettings};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
/// Limit io tests to one thread at a time.
|
/// Limit io tests to one thread at a time.
|
||||||
|
@ -124,10 +138,13 @@ mod test {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_and_cleanup(test: fn()) {
|
async fn run_and_cleanup<Fut>(test: fn() -> Fut)
|
||||||
|
where
|
||||||
|
Fut: std::future::Future<Output = ()>,
|
||||||
|
{
|
||||||
let _lock = MTX.lock();
|
let _lock = MTX.lock();
|
||||||
|
|
||||||
test();
|
test().await;
|
||||||
|
|
||||||
if PATH.exists() {
|
if PATH.exists() {
|
||||||
let path: &Path = &PATH;
|
let path: &Path = &PATH;
|
||||||
|
@ -135,50 +152,111 @@ mod test {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_store_client_state() {
|
async fn test_store_client_state() {
|
||||||
let store = JsonStore;
|
let store = JsonStore;
|
||||||
let state = ClientState::default();
|
let state = ClientState::default();
|
||||||
store.store_client_state(&PATH, state).unwrap();
|
store.store_client_state(&PATH, state).await.unwrap();
|
||||||
let loaded = store.load_client_state(&PATH).unwrap();
|
let loaded = store.load_client_state(&PATH).await.unwrap();
|
||||||
assert_eq!(loaded, ClientState::default());
|
assert_eq!(loaded, ClientState::default());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn store_client_state() {
|
async fn store_client_state() {
|
||||||
run_and_cleanup(test_store_client_state);
|
run_and_cleanup(test_store_client_state).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_store_room_state() {
|
async fn test_store_room_state() {
|
||||||
let store = JsonStore;
|
let store = JsonStore;
|
||||||
|
|
||||||
let id = RoomId::try_from("!roomid:example.com").unwrap();
|
let id = RoomId::try_from("!roomid:example.com").unwrap();
|
||||||
let user = UserId::try_from("@example:example.com").unwrap();
|
let user = UserId::try_from("@example:example.com").unwrap();
|
||||||
|
|
||||||
let room = Room::new(&id, &user);
|
let room = Room::new(&id, &user);
|
||||||
store.store_room_state(&PATH, &room).unwrap();
|
store.store_room_state(&PATH, &room).await.unwrap();
|
||||||
let loaded = store.load_room_state(&PATH, &id).unwrap();
|
let loaded = store.load_room_state(&PATH, &id).await.unwrap();
|
||||||
assert_eq!(loaded, Room::new(&id, &user));
|
assert_eq!(loaded, Room::new(&id, &user));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn store_room_state() {
|
async fn store_room_state() {
|
||||||
run_and_cleanup(test_store_room_state);
|
run_and_cleanup(test_store_room_state).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_load_rooms() {
|
async fn test_load_rooms() {
|
||||||
let store = JsonStore;
|
let store = JsonStore;
|
||||||
|
|
||||||
let id = RoomId::try_from("!roomid:example.com").unwrap();
|
let id = RoomId::try_from("!roomid:example.com").unwrap();
|
||||||
let user = UserId::try_from("@example:example.com").unwrap();
|
let user = UserId::try_from("@example:example.com").unwrap();
|
||||||
|
|
||||||
let room = Room::new(&id, &user);
|
let room = Room::new(&id, &user);
|
||||||
store.store_room_state(&PATH, &room).unwrap();
|
store.store_room_state(&PATH, &room).await.unwrap();
|
||||||
let loaded = store.load_all_rooms(&PATH).unwrap();
|
let loaded = store.load_all_rooms(&PATH).await.unwrap();
|
||||||
println!("{:?}", loaded);
|
assert_eq!(&room, loaded.get(&id).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn load_rooms() {
|
async fn load_rooms() {
|
||||||
run_and_cleanup(test_load_rooms);
|
run_and_cleanup(test_load_rooms).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn test_client_sync_store() {
|
||||||
|
let homeserver = url::Url::from_str(&mockito::server_url()).unwrap();
|
||||||
|
|
||||||
|
let session = Session {
|
||||||
|
access_token: "1234".to_owned(),
|
||||||
|
user_id: UserId::try_from("@cheeky_monkey:matrix.org").unwrap(),
|
||||||
|
device_id: "DEVICEID".to_owned(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let _m = mock(
|
||||||
|
"GET",
|
||||||
|
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
|
||||||
|
)
|
||||||
|
.with_status(200)
|
||||||
|
.with_body_from_file("tests/data/sync.json")
|
||||||
|
.create();
|
||||||
|
|
||||||
|
let _m = mock("POST", "/_matrix/client/r0/login")
|
||||||
|
.with_status(200)
|
||||||
|
.with_body_from_file("tests/data/login_response.json")
|
||||||
|
.create();
|
||||||
|
|
||||||
|
let mut path = PATH.clone();
|
||||||
|
path.push(session.user_id.to_string());
|
||||||
|
// a sync response to populate our JSON store with user_id added to path
|
||||||
|
let config = AsyncClientConfig::default().state_store_path(&path);
|
||||||
|
let client =
|
||||||
|
AsyncClient::new_with_config(homeserver.clone(), Some(session.clone()), config)
|
||||||
|
.unwrap();
|
||||||
|
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000));
|
||||||
|
let _ = client.sync(sync_settings).await.unwrap();
|
||||||
|
|
||||||
|
// remove user_id as login will set this
|
||||||
|
path.pop();
|
||||||
|
// once logged in without syncing the client is updated from the state store
|
||||||
|
let config = AsyncClientConfig::default().state_store_path(&path);
|
||||||
|
let client = AsyncClient::new_with_config(homeserver, None, config).unwrap();
|
||||||
|
client
|
||||||
|
.login("example", "wordpass", None, None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let base_client = client.base_client.read().await;
|
||||||
|
|
||||||
|
// assert the synced client and the logged in client are equal
|
||||||
|
assert_eq!(base_client.session, Some(session));
|
||||||
|
assert_eq!(
|
||||||
|
base_client.sync_token,
|
||||||
|
Some("s526_47314_0_7_1_1_1_11444_1".to_string())
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
base_client.ignored_users,
|
||||||
|
vec![UserId::try_from("@someone:example.org").unwrap()]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn client_sync_store() {
|
||||||
|
run_and_cleanup(test_client_sync_store).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ use crate::events::{
|
||||||
use crate::identifiers::{RoomId, UserId};
|
use crate::identifiers::{RoomId, UserId};
|
||||||
use crate::AsyncClient;
|
use crate::AsyncClient;
|
||||||
|
|
||||||
use mockito::{self, mock, Mock};
|
use mockito::{self, mock, Matcher, Mock};
|
||||||
|
|
||||||
use crate::models::Room;
|
use crate::models::Room;
|
||||||
|
|
||||||
|
@ -169,11 +169,7 @@ impl EventBuilder {
|
||||||
///
|
///
|
||||||
/// The `TestRunner` streams the events to the client and holds methods to make assertions
|
/// The `TestRunner` streams the events to the client and holds methods to make assertions
|
||||||
/// about the state of the client.
|
/// about the state of the client.
|
||||||
pub fn build_mock_runner<P: Into<mockito::Matcher>>(
|
pub fn build_mock_runner<P: Into<Matcher>>(mut self, method: &str, path: P) -> MockTestRunner {
|
||||||
mut self,
|
|
||||||
method: &str,
|
|
||||||
path: P,
|
|
||||||
) -> MockTestRunner {
|
|
||||||
let body = serde_json::json! {
|
let body = serde_json::json! {
|
||||||
{
|
{
|
||||||
"device_one_time_keys_count": {},
|
"device_one_time_keys_count": {},
|
||||||
|
@ -344,8 +340,12 @@ impl ClientTestRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
for event in &self.room_events {
|
for event in &self.room_events {
|
||||||
cli.receive_joined_timeline_event(room_id, &mut EventResult::Ok(event.clone()))
|
cli.receive_joined_timeline_event(
|
||||||
.await;
|
room_id,
|
||||||
|
&mut EventResult::Ok(event.clone()),
|
||||||
|
&mut false,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
for event in &self.presence_events {
|
for event in &self.presence_events {
|
||||||
cli.receive_presence_event(room_id, event).await;
|
cli.receive_presence_event(room_id, event).await;
|
||||||
|
|
Loading…
Reference in New Issue