base: First working version of the new state store.

master
Damir Jelić 2020-10-24 20:01:39 +02:00
parent 9ce7feea1a
commit c1e679147d
6 changed files with 406 additions and 224 deletions

View File

@ -40,7 +40,7 @@ use zeroize::Zeroizing;
use tracing::{debug, warn}; use tracing::{debug, warn};
use tracing::{error, info, instrument}; use tracing::{error, info, instrument};
use matrix_sdk_base::{BaseClient, BaseClientConfig, Room, Session, StateStore}; use matrix_sdk_base::{BaseClient, BaseClientConfig, RoomSummary, Session};
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use matrix_sdk_base::crypto::{ use matrix_sdk_base::crypto::{
@ -117,7 +117,7 @@ use matrix_sdk_common::{
use crate::{ use crate::{
http_client::{client_with_config, HttpClient, HttpSend}, http_client::{client_with_config, HttpClient, HttpSend},
Error, EventEmitter, OutgoingRequest, Result, Error, OutgoingRequest, Result,
}; };
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
@ -246,13 +246,13 @@ impl ClientConfig {
Ok(self) Ok(self)
} }
/// Set a custom implementation of a `StateStore`. ///// Set a custom implementation of a `StateStore`.
/// /////
/// The state store should be opened before being set. ///// The state store should be opened before being set.
pub fn state_store(mut self, store: Box<dyn StateStore>) -> Self { //pub fn state_store(mut self, store: Box<dyn StateStore>) -> Self {
self.base_config = self.base_config.state_store(store); // self.base_config = self.base_config.state_store(store);
self // self
} //}
/// Set the path for storage. /// Set the path for storage.
/// ///
@ -429,65 +429,54 @@ impl Client {
session.as_ref().cloned().map(|s| s.user_id) session.as_ref().cloned().map(|s| s.user_id)
} }
/// Add `EventEmitter` to `Client`. ///// Add `EventEmitter` to `Client`.
/// /////
/// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. ///// The methods of `EventEmitter` are called when the respective `RoomEvents` occur.
pub async fn add_event_emitter(&mut self, emitter: Box<dyn EventEmitter>) { //pub async fn add_event_emitter(&mut self, emitter: Box<dyn EventEmitter>) {
self.base_client.add_event_emitter(emitter).await; // self.base_client.add_event_emitter(emitter).await;
} //}
/// Returns the joined rooms this client knows about. // /// Returns the joined rooms this client knows about.
pub fn joined_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<RwLock<Room>>>>> { // pub fn joined_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<RwLock<Room>>>>> {
self.base_client.joined_rooms() // self.base_client.joined_rooms()
} // }
/// Returns the invited rooms this client knows about. // /// Returns the invited rooms this client knows about.
pub fn invited_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<RwLock<Room>>>>> { // pub fn invited_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<RwLock<Room>>>>> {
self.base_client.invited_rooms() // self.base_client.invited_rooms()
} // }
/// Returns the left rooms this client knows about. // /// Returns the left rooms this client knows about.
pub fn left_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<RwLock<Room>>>>> { // pub fn left_rooms(&self) -> Arc<RwLock<HashMap<RoomId, Arc<RwLock<Room>>>>> {
self.base_client.left_rooms() // self.base_client.left_rooms()
} // }
/// Get a joined room with the given room id. /// Get a joined room with the given room id.
/// ///
/// # Arguments /// # Arguments
/// ///
/// `room_id` - The unique id of the room that should be fetched. /// `room_id` - The unique id of the room that should be fetched.
pub async fn get_joined_room(&self, room_id: &RoomId) -> Option<Arc<RwLock<Room>>> { pub fn get_joined_room(&self, room_id: &RoomId) -> Option<RoomSummary> {
self.base_client.get_joined_room(room_id).await self.base_client.get_joined_room(room_id)
} }
/// Get an invited room with the given room id. ///// Get an invited room with the given room id.
/// /////
/// # Arguments ///// # Arguments
/// /////
/// `room_id` - The unique id of the room that should be fetched. ///// `room_id` - The unique id of the room that should be fetched.
pub async fn get_invited_room(&self, room_id: &RoomId) -> Option<Arc<RwLock<Room>>> { //pub async fn get_invited_room(&self, room_id: &RoomId) -> Option<Arc<RwLock<Room>>> {
self.base_client.get_invited_room(room_id).await // self.base_client.get_invited_room(room_id).await
} //}
/// Get a left room with the given room id. ///// Get a left room with the given room id.
/// /////
/// # Arguments ///// # Arguments
/// /////
/// `room_id` - The unique id of the room that should be fetched. ///// `room_id` - The unique id of the room that should be fetched.
pub async fn get_left_room(&self, room_id: &RoomId) -> Option<Arc<RwLock<Room>>> { //pub async fn get_left_room(&self, room_id: &RoomId) -> Option<Arc<RwLock<Room>>> {
self.base_client.get_left_room(room_id).await // self.base_client.get_left_room(room_id).await
} //}
/// This allows `Client` to manually store `Room` state with the provided
/// `StateStore`.
///
/// Returns Ok when a successful `Room` store occurs.
pub async fn store_room_state(&self, room_id: &RoomId) -> Result<()> {
self.base_client
.store_room_state(room_id)
.await
.map_err(Into::into)
}
/// Login to the server. /// Login to the server.
/// ///
@ -1021,13 +1010,10 @@ impl Client {
let _guard = mutex.lock().await; let _guard = mutex.lock().await;
{ {
let room = self.base_client.get_joined_room(room_id).await; let room = self.base_client.get_joined_room(room_id).unwrap();
let room = room.as_ref().unwrap().read().await; let members = room.joined_user_ids().await;
let mut members = room let mut members_iter = members.iter();
.joined_members self.claim_one_time_keys(&mut members_iter).await?;
.keys()
.chain(room.invited_members.keys());
self.claim_one_time_keys(&mut members).await?;
}; };
let response = self.share_group_session(room_id).await; let response = self.share_group_session(room_id).await;
@ -1120,10 +1106,8 @@ impl Client {
/// Returns true if a room with the given id was found and the room is /// Returns true if a room with the given id was found and the room is
/// encrypted, false if the room wasn't found or isn't encrypted. /// encrypted, false if the room wasn't found or isn't encrypted.
async fn is_room_encrypted(&self, room_id: &RoomId) -> bool { async fn is_room_encrypted(&self, room_id: &RoomId) -> bool {
let room = self.base_client.get_joined_room(room_id).await; match self.base_client.get_joined_room(room_id) {
Some(r) => r.is_encrypted(),
match room {
Some(r) => r.read().await.is_encrypted(),
None => false, None => false,
} }
} }
@ -1937,7 +1921,6 @@ mod test {
get_public_rooms, get_public_rooms_filtered, register::RegistrationKind, Client, get_public_rooms, get_public_rooms_filtered, register::RegistrationKind, Client,
ClientConfig, Invite3pid, Session, SyncSettings, Url, ClientConfig, Invite3pid, Session, SyncSettings, Url,
}; };
use matrix_sdk_base::JsonStore;
use matrix_sdk_common::{ use matrix_sdk_common::{
api::r0::{ api::r0::{
account::register::Request as RegistrationRequest, account::register::Request as RegistrationRequest,
@ -2005,78 +1988,78 @@ mod test {
assert!(client.devices().await.is_ok()); assert!(client.devices().await.is_ok());
} }
#[tokio::test] // #[tokio::test]
async fn test_join_leave_room() { // async fn test_join_leave_room() {
let homeserver = Url::from_str(&mockito::server_url()).unwrap(); // let homeserver = Url::from_str(&mockito::server_url()).unwrap();
let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); // let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
let session = Session { // let session = Session {
access_token: "1234".to_owned(), // access_token: "1234".to_owned(),
user_id: user_id!("@example:localhost"), // user_id: user_id!("@example:localhost"),
device_id: "DEVICEID".into(), // device_id: "DEVICEID".into(),
}; // };
let _m = mock( // let _m = mock(
"GET", // "GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), // Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
) // )
.with_status(200) // .with_status(200)
.with_body(test_json::SYNC.to_string()) // .with_body(test_json::SYNC.to_string())
.create(); // .create();
let dir = tempdir().unwrap(); // let dir = tempdir().unwrap();
let path: &Path = dir.path(); // let path: &Path = dir.path();
let store = Box::new(JsonStore::open(path).unwrap()); // let store = Box::new(JsonStore::open(path).unwrap());
let config = ClientConfig::default().state_store(store); // let config = ClientConfig::default().state_store(store);
let client = Client::new_with_config(homeserver.clone(), config).unwrap(); // let client = Client::new_with_config(homeserver.clone(), config).unwrap();
client.restore_login(session.clone()).await.unwrap(); // client.restore_login(session.clone()).await.unwrap();
let room = client.get_joined_room(&room_id).await; // let room = client.get_joined_room(&room_id).await;
assert!(room.is_none()); // assert!(room.is_none());
client.sync_once(SyncSettings::default()).await.unwrap(); // client.sync_once(SyncSettings::default()).await.unwrap();
let room = client.get_left_room(&room_id).await; // let room = client.get_left_room(&room_id).await;
assert!(room.is_none()); // assert!(room.is_none());
let room = client.get_joined_room(&room_id).await; // let room = client.get_joined_room(&room_id).await;
assert!(room.is_some()); // assert!(room.is_some());
// test store reloads with correct room state from JsonStore // // test store reloads with correct room state from JsonStore
let store = Box::new(JsonStore::open(path).unwrap()); // let store = Box::new(JsonStore::open(path).unwrap());
let config = ClientConfig::default().state_store(store); // let config = ClientConfig::default().state_store(store);
let joined_client = Client::new_with_config(homeserver, config).unwrap(); // let joined_client = Client::new_with_config(homeserver, config).unwrap();
joined_client.restore_login(session).await.unwrap(); // joined_client.restore_login(session).await.unwrap();
// joined room reloaded from state store // // joined room reloaded from state store
joined_client // joined_client
.sync_once(SyncSettings::default()) // .sync_once(SyncSettings::default())
.await // .await
.unwrap(); // .unwrap();
let room = joined_client.get_joined_room(&room_id).await; // let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_some()); // assert!(room.is_some());
let _m = mock( // let _m = mock(
"GET", // "GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), // Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
) // )
.with_status(200) // .with_status(200)
.with_body(test_json::LEAVE_SYNC_EVENT.to_string()) // .with_body(test_json::LEAVE_SYNC_EVENT.to_string())
.create(); // .create();
joined_client // joined_client
.sync_once(SyncSettings::default()) // .sync_once(SyncSettings::default())
.await // .await
.unwrap(); // .unwrap();
let room = joined_client.get_joined_room(&room_id).await; // let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_none()); // assert!(room.is_none());
let room = joined_client.get_left_room(&room_id).await; // let room = joined_client.get_left_room(&room_id).await;
assert!(room.is_some()); // assert!(room.is_some());
} // }
#[tokio::test] #[tokio::test]
async fn account_data() { async fn account_data() {
@ -2649,62 +2632,62 @@ mod test {
.is_some()) .is_some())
} }
#[tokio::test] // #[tokio::test]
async fn test_client_sync_store() { // async fn test_client_sync_store() {
let homeserver = url::Url::from_str(&mockito::server_url()).unwrap(); // let homeserver = url::Url::from_str(&mockito::server_url()).unwrap();
let session = Session { // let session = Session {
access_token: "1234".to_owned(), // access_token: "1234".to_owned(),
user_id: user_id!("@cheeky_monkey:matrix.org"), // user_id: user_id!("@cheeky_monkey:matrix.org"),
device_id: "DEVICEID".into(), // device_id: "DEVICEID".into(),
}; // };
let _m = mock( // let _m = mock(
"GET", // "GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), // Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
) // )
.with_status(200) // .with_status(200)
.with_body(test_json::SYNC.to_string()) // .with_body(test_json::SYNC.to_string())
.create(); // .create();
let _m = mock("POST", "/_matrix/client/r0/login") // let _m = mock("POST", "/_matrix/client/r0/login")
.with_status(200) // .with_status(200)
.with_body(test_json::LOGIN.to_string()) // .with_body(test_json::LOGIN.to_string())
.create(); // .create();
let dir = tempdir().unwrap(); // let dir = tempdir().unwrap();
// a sync response to populate our JSON store // // a sync response to populate our JSON store
let config = // let config =
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); // ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));
let client = Client::new_with_config(homeserver.clone(), config).unwrap(); // let client = Client::new_with_config(homeserver.clone(), config).unwrap();
client.restore_login(session.clone()).await.unwrap(); // client.restore_login(session.clone()).await.unwrap();
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); // let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000));
// gather state to save to the db, the first time through loading will be skipped // // gather state to save to the db, the first time through loading will be skipped
let _ = client.sync_once(sync_settings.clone()).await.unwrap(); // let _ = client.sync_once(sync_settings.clone()).await.unwrap();
// now syncing the client will update from the state store // // now syncing the client will update from the state store
let config = // let config =
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); // ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));
let client = Client::new_with_config(homeserver, config).unwrap(); // let client = Client::new_with_config(homeserver, config).unwrap();
client.restore_login(session.clone()).await.unwrap(); // client.restore_login(session.clone()).await.unwrap();
client.sync_once(sync_settings).await.unwrap(); // client.sync_once(sync_settings).await.unwrap();
let base_client = &client.base_client; // let base_client = &client.base_client;
// assert the synced client and the logged in client are equal // // assert the synced client and the logged in client are equal
assert_eq!(*base_client.session().read().await, Some(session)); // assert_eq!(*base_client.session().read().await, Some(session));
assert_eq!( // assert_eq!(
base_client.sync_token().await, // base_client.sync_token().await,
Some("s526_47314_0_7_1_1_1_11444_1".to_string()) // Some("s526_47314_0_7_1_1_1_11444_1".to_string())
); // );
// This is commented out because this field is private... // // This is commented out because this field is private...
// assert_eq!( // // assert_eq!(
// *base_client.ignored_users.read().await, // // *base_client.ignored_users.read().await,
// vec![user_id!("@someone:example.org")] // // vec![user_id!("@someone:example.org")]
// ); // // );
} // }
#[tokio::test] #[tokio::test]
async fn sync() { async fn sync() {

View File

@ -66,16 +66,7 @@ compile_error!("only one of 'native-tls' or 'rustls-tls' features can be enabled
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))] #[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub use matrix_sdk_base::crypto::LocalTrust; pub use matrix_sdk_base::crypto::LocalTrust;
#[cfg(not(target_arch = "wasm32"))] pub use matrix_sdk_base::{Error as BaseError, RoomSummary, Session};
pub use matrix_sdk_base::JsonStore;
pub use matrix_sdk_base::{
CustomEvent, Error as BaseError, EventEmitter, Room, RoomMember, RoomState, Session,
StateStore, SyncRoom,
};
#[cfg(feature = "messages")]
#[cfg_attr(feature = "docs", doc(cfg(messages)))]
pub use matrix_sdk_base::{MessageQueue, PossiblyRedactedExt};
pub use matrix_sdk_common::*; pub use matrix_sdk_common::*;
pub use reqwest; pub use reqwest;

View File

@ -26,6 +26,7 @@ docs = ["encryption", "sqlite_cryptostore", "messages"]
[dependencies] [dependencies]
async-trait = "0.1.41" async-trait = "0.1.41"
serde = "1.0.116" serde = "1.0.116"
dashmap= "*"
serde_json = "1.0.58" serde_json = "1.0.58"
zeroize = "1.1.1" zeroize = "1.1.1"
tracing = "0.1.21" tracing = "0.1.21"
@ -37,6 +38,7 @@ matrix-sdk-crypto = { version = "0.1.0", path = "../matrix_sdk_crypto", optional
# Misc dependencies # Misc dependencies
thiserror = "1.0.21" thiserror = "1.0.21"
sled = "0.34.4" sled = "0.34.4"
futures = "0.3.6"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "0.2.22" version = "0.2.22"
@ -44,7 +46,6 @@ default-features = false
features = ["sync", "fs"] features = ["sync", "fs"]
[dev-dependencies] [dev-dependencies]
futures = "0.3.6"
matrix-sdk-test = { version = "0.1.0", path = "../matrix_sdk_test" } matrix-sdk-test = { version = "0.1.0", path = "../matrix_sdk_test" }
http = "0.2.1" http = "0.2.1"
tracing-subscriber = "0.2.13" tracing-subscriber = "0.2.13"

View File

@ -21,6 +21,8 @@ use std::{
sync::Arc, sync::Arc,
}; };
use dashmap::DashMap;
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use matrix_sdk_common::locks::Mutex; use matrix_sdk_common::locks::Mutex;
use matrix_sdk_common::{ use matrix_sdk_common::{
@ -43,15 +45,16 @@ use matrix_sdk_common::{
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use matrix_sdk_crypto::{ use matrix_sdk_crypto::{
store::{CryptoStore, CryptoStoreError}, store::{CryptoStore, CryptoStoreError},
Device, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas, ToDeviceRequest, Device, EncryptionSettings, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas,
UserDevices, ToDeviceRequest, UserDevices,
}; };
use tracing::info;
use zeroize::Zeroizing; use zeroize::Zeroizing;
use crate::{ use crate::{
error::Result, error::Result,
session::Session, session::Session,
store::{StateChanges, Store}, store::{RoomSummary, StateChanges, Store},
}; };
pub type Token = String; pub type Token = String;
@ -180,6 +183,7 @@ pub struct BaseClient {
pub(crate) sync_token: Arc<RwLock<Option<Token>>>, pub(crate) sync_token: Arc<RwLock<Option<Token>>>,
/// Database /// Database
store: Store, store: Store,
joined_rooms: Arc<DashMap<RoomId, RoomSummary>>,
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
olm: Arc<Mutex<Option<OlmMachine>>>, olm: Arc<Mutex<Option<OlmMachine>>>,
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
@ -282,10 +286,18 @@ impl BaseClient {
/// * `config` - An optional session if the user already has one from a /// * `config` - An optional session if the user already has one from a
/// previous login call. /// previous login call.
pub fn new_with_config(config: BaseClientConfig) -> Result<Self> { pub fn new_with_config(config: BaseClientConfig) -> Result<Self> {
let store = if let Some(path) = &config.store_path {
info!("Opening store in path {}", path.display());
Store::open_with_path(path)
} else {
Store::open()
};
Ok(BaseClient { Ok(BaseClient {
session: Arc::new(RwLock::new(None)), session: Arc::new(RwLock::new(None)),
sync_token: Arc::new(RwLock::new(None)), sync_token: Arc::new(RwLock::new(None)),
store: Store::open(), store,
joined_rooms: Arc::new(DashMap::new()),
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
olm: Arc::new(Mutex::new(None)), olm: Arc::new(Mutex::new(None)),
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
@ -429,27 +441,45 @@ impl BaseClient {
} }
} }
let changes = StateChanges::default(); let mut changes = StateChanges::default();
for (room_id, room) in &response.rooms.join { for (room_id, room) in &response.rooms.join {
for e in &room.state.events { for e in &room.state.events {
// info!("Handling event for room {} {:#?}", room_id, e);
if let Ok(event) = hoist_and_deserialize_state_event(e) { if let Ok(event) = hoist_and_deserialize_state_event(e) {
match event { match event {
AnySyncStateEvent::RoomMember(member) => { AnySyncStateEvent::RoomMember(member) => {
let member_id = UserId::try_from(member.state_key).unwrap(); // let member_id = UserId::try_from(member.state_key.clone()).unwrap();
let prev_member = // self.store.get_member_event(room_id, &member_id).await;
self.store.get_member_event(room_id, &member_id).await;
use matrix_sdk_common::events::room::member::MembershipState::*; use matrix_sdk_common::events::room::member::MembershipState::*;
match member.content.membership { // TODO this isn't right, check the diff against
// your previous state.
match &member.content.membership {
Join => { Join => {
info!("ADDING MEMBER {} to {}", member.state_key, room_id);
changes.add_joined_member(room_id, member)
// TODO check if the display name is // TODO check if the display name is
// ambigous // ambigous
} }
Invited => {
info!(
"ADDING INVITED MEMBER {} to {}",
member.state_key, room_id
);
changes.add_invited_member(room_id, member)
}
_ => (), _ => (),
} }
} }
_ => (), AnySyncStateEvent::RoomCreate(create) => {
info!("Creating new room {}", room_id);
let room =
RoomSummary::new(self.store.clone(), room_id, &create.content);
self.joined_rooms.insert(room_id.clone(), room.clone());
changes.add_room_summary(room_id.clone(), room);
}
_ => changes.add_state_event(room_id, event),
} }
} }
} }
@ -542,28 +572,27 @@ impl BaseClient {
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))] #[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn share_group_session(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> { pub async fn share_group_session(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
todo!() let olm = self.olm.lock().await;
// let room = self.get_joined_room(room_id).await.expect("No room found");
// let olm = self.olm.lock().await;
// match &*olm { match &*olm {
// Some(o) => { Some(o) => {
// let room = room.write().await; // XXX: We construct members in a slightly roundabout way instead of chaining the
// iterators directly because of https://github.com/rust-lang/rust/issues/64552
// let joined_members = room.joined_members.keys();
// let invited_members = room.joined_members.keys();
// let members: Vec<&UserId> = joined_members.chain(invited_members).collect();
let members = self.store.get_joined_members(room_id).await;
Ok(
o.share_group_session(room_id, members.iter(), EncryptionSettings::default())
.await?,
)
}
None => panic!("Olm machine wasn't started"),
}
}
// // XXX: We construct members in a slightly roundabout way instead of chaining the pub fn get_joined_room(&self, room_id: &RoomId) -> Option<RoomSummary> {
// // iterators directly because of https://github.com/rust-lang/rust/issues/64552 self.joined_rooms.get(room_id).map(|r| r.clone())
// let joined_members = room.joined_members.keys();
// let invited_members = room.joined_members.keys();
// let members: Vec<&UserId> = joined_members.chain(invited_members).collect();
// Ok(o.share_group_session(
// room_id,
// members.into_iter(),
// room.encrypted.clone().unwrap_or_default(),
// )
// .await?)
// }
// None => panic!("Olm machine wasn't started"),
// }
} }
/// Encrypt a message event content. /// Encrypt a message event content.

View File

@ -47,7 +47,7 @@ mod error;
mod session; mod session;
mod store; mod store;
pub use store::Store; pub use store::{RoomSummary, Store};
pub use client::{BaseClient, BaseClientConfig, RoomState, RoomStateType}; pub use client::{BaseClient, BaseClientConfig, RoomState, RoomStateType};

View File

@ -1,9 +1,22 @@
use std::{collections::BTreeMap, convert::TryFrom}; use std::{
collections::BTreeMap,
convert::TryFrom,
path::Path,
sync::{Arc, Mutex as SyncMutex},
};
use futures::executor::block_on;
use matrix_sdk_common::{ use matrix_sdk_common::{
events::{room::member::MemberEventContent, AnySyncStateEvent, SyncStateEvent}, events::{
room::{
create::CreateEventContent, encryption::EncryptionEventContent,
member::MemberEventContent,
},
AnySyncStateEvent, SyncStateEvent,
},
identifiers::{RoomId, UserId}, identifiers::{RoomId, UserId},
}; };
use serde::{Deserialize, Serialize};
use serde_json; use serde_json;
use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree}; use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree};
@ -13,7 +26,9 @@ pub struct Store {
inner: Db, inner: Db,
session: Tree, session: Tree,
members: Tree, members: Tree,
joined_user_ids: Tree,
room_state: Tree, room_state: Tree,
room_summaries: Tree,
} }
use crate::Session; use crate::Session;
@ -23,9 +38,9 @@ pub struct StateChanges {
session: Option<Session>, session: Option<Session>,
members: BTreeMap<RoomId, BTreeMap<UserId, SyncStateEvent<MemberEventContent>>>, members: BTreeMap<RoomId, BTreeMap<UserId, SyncStateEvent<MemberEventContent>>>,
state: BTreeMap<RoomId, BTreeMap<String, AnySyncStateEvent>>, state: BTreeMap<RoomId, BTreeMap<String, AnySyncStateEvent>>,
display_names: BTreeMap<RoomId, BTreeMap<String, BTreeMap<UserId, ()>>>, room_summaries: BTreeMap<RoomId, RoomSummary>,
// display_names: BTreeMap<RoomId, BTreeMap<String, BTreeMap<UserId, ()>>>,
added_user_ids: BTreeMap<RoomId, UserId>, joined_user_ids: BTreeMap<RoomId, UserId>,
invited_user_ids: BTreeMap<RoomId, UserId>, invited_user_ids: BTreeMap<RoomId, UserId>,
removed_user_ids: BTreeMap<RoomId, UserId>, removed_user_ids: BTreeMap<RoomId, UserId>,
} }
@ -37,12 +52,39 @@ impl StateChanges {
event: SyncStateEvent<MemberEventContent>, event: SyncStateEvent<MemberEventContent>,
) { ) {
let user_id = UserId::try_from(event.state_key.as_str()).unwrap(); let user_id = UserId::try_from(event.state_key.as_str()).unwrap();
self.joined_user_ids
.insert(room_id.to_owned(), user_id.clone());
self.members self.members
.entry(room_id.to_owned()) .entry(room_id.to_owned())
.or_insert_with(BTreeMap::new) .or_insert_with(BTreeMap::new)
.insert(user_id, event); .insert(user_id, event);
} }
pub fn add_invited_member(
&mut self,
room_id: &RoomId,
event: SyncStateEvent<MemberEventContent>,
) {
let user_id = UserId::try_from(event.state_key.as_str()).unwrap();
self.invited_user_ids
.insert(room_id.to_owned(), user_id.clone());
self.members
.entry(room_id.to_owned())
.or_insert_with(BTreeMap::new)
.insert(user_id, event);
}
pub fn add_room_summary(&mut self, room_id: RoomId, summary: RoomSummary) {
self.room_summaries.insert(room_id, summary);
}
pub fn add_state_event(&mut self, room_id: &RoomId, event: AnySyncStateEvent) {
self.state
.entry(room_id.to_owned())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_string(), event);
}
pub fn from_event(room_id: &RoomId, event: SyncStateEvent<MemberEventContent>) -> Self { pub fn from_event(room_id: &RoomId, event: SyncStateEvent<MemberEventContent>) -> Self {
let mut changes = Self::default(); let mut changes = Self::default();
changes.add_joined_member(room_id, event); changes.add_joined_member(room_id, event);
@ -60,24 +102,136 @@ impl From<Session> for StateChanges {
} }
} }
#[derive(Debug, Clone)]
pub struct RoomSummary {
room_id: Arc<RoomId>,
inner: Arc<SyncMutex<InnerSummary>>,
store: Store,
}
impl RoomSummary {
pub fn new(store: Store, room_id: &RoomId, creation_event: &CreateEventContent) -> Self {
let room_id = Arc::new(room_id.clone());
Self {
room_id: room_id.clone(),
store,
inner: Arc::new(SyncMutex::new(InnerSummary {
creation_content: creation_event.clone(),
room_id,
encryption: None,
last_prev_batch: None,
})),
}
}
fn serialize(&self) -> Vec<u8> {
let inner = self.inner.lock().unwrap();
serde_json::to_vec(&*inner).unwrap()
}
pub async fn joined_user_ids(&self) -> Vec<UserId> {
self.store.get_joined_members(&self.room_id).await
}
pub fn is_encrypted(&self) -> bool {
self.inner.lock().unwrap().encryption.is_some()
}
pub fn get_member(&self, user_id: &UserId) -> Option<RoomMember> {
block_on(self.store.get_member_event(&self.room_id, user_id)).map(|e| e.into())
}
pub fn room_id(&self) -> &RoomId {
&self.room_id
}
pub fn display_name(&self) -> String {
"TEST ROOM NAME".to_string()
}
}
#[derive(Debug)]
pub struct RoomMember {
event: Arc<SyncStateEvent<MemberEventContent>>,
}
impl RoomMember {
pub fn display_name(&self) -> &Option<String> {
&self.event.content.displayname
}
pub fn disambiguated_name(&self) -> String {
self.event.state_key.clone()
}
pub fn name(&self) -> String {
self.event.state_key.clone()
}
pub fn unique_name(&self) -> String {
self.event.state_key.clone()
}
}
impl From<SyncStateEvent<MemberEventContent>> for RoomMember {
fn from(event: SyncStateEvent<MemberEventContent>) -> Self {
Self {
event: Arc::new(event),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct InnerSummary {
room_id: Arc<RoomId>,
creation_content: CreateEventContent,
encryption: Option<EncryptionEventContent>,
last_prev_batch: Option<String>,
}
impl Store { impl Store {
pub fn open() -> Self { fn open_helper(db: Db) -> Self {
let db = Config::new().temporary(true).open().unwrap();
let session = db.open_tree("session").unwrap(); let session = db.open_tree("session").unwrap();
let members = db.open_tree("members").unwrap(); let members = db.open_tree("members").unwrap();
let room_state = db.open_tree("members").unwrap(); let joined_user_ids = db.open_tree("joined_user_ids").unwrap();
let room_state = db.open_tree("room_state").unwrap();
let room_summaries = db.open_tree("room_summaries").unwrap();
Self { Self {
inner: db, inner: db,
session, session,
members, members,
joined_user_ids,
room_state, room_state,
room_summaries,
} }
} }
pub fn open() -> Self {
let db = Config::new().temporary(true).open().unwrap();
Store::open_helper(db)
}
pub fn open_with_path(path: impl AsRef<Path>) -> Self {
let path = path.as_ref().join("matrix-sdk-state");
let db = Config::new().temporary(false).path(path).open().unwrap();
Store::open_helper(db)
}
pub async fn save_changes(&self, changes: &StateChanges) { pub async fn save_changes(&self, changes: &StateChanges) {
let ret: TransactionResult<()> = let ret: TransactionResult<()> = (
(&self.session, &self.members).transaction(|(session, members)| { &self.session,
&self.members,
&self.joined_user_ids,
&self.room_state,
&self.room_summaries,
)
.transaction(|(session, members, joined, state, summaries)| {
if let Some(s) = &changes.session { if let Some(s) = &changes.session {
session.insert("session", serde_json::to_vec(s).unwrap())?; session.insert("session", serde_json::to_vec(s).unwrap())?;
} }
@ -91,6 +245,23 @@ impl Store {
} }
} }
for (room, user) in &changes.joined_user_ids {
joined.insert(room.as_bytes(), user.as_bytes())?;
}
for (room, events) in &changes.state {
for (state_key, event) in events {
state.insert(
format!("{}{}", room.as_str(), state_key).as_bytes(),
serde_json::to_vec(&event).unwrap(),
)?;
}
}
for (room_id, summary) in &changes.room_summaries {
summaries.insert(room_id.as_str().as_bytes(), summary.serialize())?;
}
Ok(()) Ok(())
}); });
@ -110,6 +281,13 @@ impl Store {
.map(|v| serde_json::from_slice(&v).unwrap()) .map(|v| serde_json::from_slice(&v).unwrap())
} }
pub async fn get_joined_members(&self, room_id: &RoomId) -> Vec<UserId> {
self.joined_user_ids
.scan_prefix(room_id.as_bytes())
.map(|u| UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap())
.collect()
}
pub fn get_session(&self) -> Option<Session> { pub fn get_session(&self) -> Option<Session> {
self.session self.session
.get("session") .get("session")