From c1e679147dd245152554aa2f8f3e1f9262b7ceea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Sat, 24 Oct 2020 20:01:39 +0200 Subject: [PATCH] base: First working version of the new state store. --- matrix_sdk/src/client.rs | 327 ++++++++++++++++------------------ matrix_sdk/src/lib.rs | 11 +- matrix_sdk_base/Cargo.toml | 3 +- matrix_sdk_base/src/client.rs | 89 +++++---- matrix_sdk_base/src/lib.rs | 2 +- matrix_sdk_base/src/store.rs | 198 ++++++++++++++++++-- 6 files changed, 406 insertions(+), 224 deletions(-) diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index b667e4a3..c4cf808f 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -40,7 +40,7 @@ use zeroize::Zeroizing; use tracing::{debug, warn}; use tracing::{error, info, instrument}; -use matrix_sdk_base::{BaseClient, BaseClientConfig, Room, Session, StateStore}; +use matrix_sdk_base::{BaseClient, BaseClientConfig, RoomSummary, Session}; #[cfg(feature = "encryption")] use matrix_sdk_base::crypto::{ @@ -117,7 +117,7 @@ use matrix_sdk_common::{ use crate::{ http_client::{client_with_config, HttpClient, HttpSend}, - Error, EventEmitter, OutgoingRequest, Result, + Error, OutgoingRequest, Result, }; #[cfg(feature = "encryption")] @@ -246,13 +246,13 @@ impl ClientConfig { Ok(self) } - /// Set a custom implementation of a `StateStore`. - /// - /// The state store should be opened before being set. - pub fn state_store(mut self, store: Box) -> Self { - self.base_config = self.base_config.state_store(store); - self - } + ///// Set a custom implementation of a `StateStore`. + ///// + ///// The state store should be opened before being set. + //pub fn state_store(mut self, store: Box) -> Self { + // self.base_config = self.base_config.state_store(store); + // self + //} /// Set the path for storage. /// @@ -429,65 +429,54 @@ impl Client { session.as_ref().cloned().map(|s| s.user_id) } - /// Add `EventEmitter` to `Client`. - /// - /// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. - pub async fn add_event_emitter(&mut self, emitter: Box) { - self.base_client.add_event_emitter(emitter).await; - } + ///// Add `EventEmitter` to `Client`. + ///// + ///// The methods of `EventEmitter` are called when the respective `RoomEvents` occur. + //pub async fn add_event_emitter(&mut self, emitter: Box) { + // self.base_client.add_event_emitter(emitter).await; + //} - /// Returns the joined rooms this client knows about. - pub fn joined_rooms(&self) -> Arc>>>> { - self.base_client.joined_rooms() - } + // /// Returns the joined rooms this client knows about. + // pub fn joined_rooms(&self) -> Arc>>>> { + // self.base_client.joined_rooms() + // } - /// Returns the invited rooms this client knows about. - pub fn invited_rooms(&self) -> Arc>>>> { - self.base_client.invited_rooms() - } + // /// Returns the invited rooms this client knows about. + // pub fn invited_rooms(&self) -> Arc>>>> { + // self.base_client.invited_rooms() + // } - /// Returns the left rooms this client knows about. - pub fn left_rooms(&self) -> Arc>>>> { - self.base_client.left_rooms() - } + // /// Returns the left rooms this client knows about. + // pub fn left_rooms(&self) -> Arc>>>> { + // self.base_client.left_rooms() + // } /// Get a joined room with the given room id. /// /// # Arguments /// /// `room_id` - The unique id of the room that should be fetched. - pub async fn get_joined_room(&self, room_id: &RoomId) -> Option>> { - self.base_client.get_joined_room(room_id).await + pub fn get_joined_room(&self, room_id: &RoomId) -> Option { + self.base_client.get_joined_room(room_id) } - /// Get an invited room with the given room id. - /// - /// # Arguments - /// - /// `room_id` - The unique id of the room that should be fetched. - pub async fn get_invited_room(&self, room_id: &RoomId) -> Option>> { - self.base_client.get_invited_room(room_id).await - } + ///// Get an invited room with the given room id. + ///// + ///// # Arguments + ///// + ///// `room_id` - The unique id of the room that should be fetched. + //pub async fn get_invited_room(&self, room_id: &RoomId) -> Option>> { + // self.base_client.get_invited_room(room_id).await + //} - /// Get a left room with the given room id. - /// - /// # Arguments - /// - /// `room_id` - The unique id of the room that should be fetched. - pub async fn get_left_room(&self, room_id: &RoomId) -> Option>> { - self.base_client.get_left_room(room_id).await - } - - /// This allows `Client` to manually store `Room` state with the provided - /// `StateStore`. - /// - /// Returns Ok when a successful `Room` store occurs. - pub async fn store_room_state(&self, room_id: &RoomId) -> Result<()> { - self.base_client - .store_room_state(room_id) - .await - .map_err(Into::into) - } + ///// Get a left room with the given room id. + ///// + ///// # Arguments + ///// + ///// `room_id` - The unique id of the room that should be fetched. + //pub async fn get_left_room(&self, room_id: &RoomId) -> Option>> { + // self.base_client.get_left_room(room_id).await + //} /// Login to the server. /// @@ -1021,13 +1010,10 @@ impl Client { let _guard = mutex.lock().await; { - let room = self.base_client.get_joined_room(room_id).await; - let room = room.as_ref().unwrap().read().await; - let mut members = room - .joined_members - .keys() - .chain(room.invited_members.keys()); - self.claim_one_time_keys(&mut members).await?; + let room = self.base_client.get_joined_room(room_id).unwrap(); + let members = room.joined_user_ids().await; + let mut members_iter = members.iter(); + self.claim_one_time_keys(&mut members_iter).await?; }; let response = self.share_group_session(room_id).await; @@ -1120,10 +1106,8 @@ impl Client { /// Returns true if a room with the given id was found and the room is /// encrypted, false if the room wasn't found or isn't encrypted. async fn is_room_encrypted(&self, room_id: &RoomId) -> bool { - let room = self.base_client.get_joined_room(room_id).await; - - match room { - Some(r) => r.read().await.is_encrypted(), + match self.base_client.get_joined_room(room_id) { + Some(r) => r.is_encrypted(), None => false, } } @@ -1937,7 +1921,6 @@ mod test { get_public_rooms, get_public_rooms_filtered, register::RegistrationKind, Client, ClientConfig, Invite3pid, Session, SyncSettings, Url, }; - use matrix_sdk_base::JsonStore; use matrix_sdk_common::{ api::r0::{ account::register::Request as RegistrationRequest, @@ -2005,78 +1988,78 @@ mod test { assert!(client.devices().await.is_ok()); } - #[tokio::test] - async fn test_join_leave_room() { - let homeserver = Url::from_str(&mockito::server_url()).unwrap(); + // #[tokio::test] + // async fn test_join_leave_room() { + // let homeserver = Url::from_str(&mockito::server_url()).unwrap(); - let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + // let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); - let session = Session { - access_token: "1234".to_owned(), - user_id: user_id!("@example:localhost"), - device_id: "DEVICEID".into(), - }; + // let session = Session { + // access_token: "1234".to_owned(), + // user_id: user_id!("@example:localhost"), + // device_id: "DEVICEID".into(), + // }; - let _m = mock( - "GET", - Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), - ) - .with_status(200) - .with_body(test_json::SYNC.to_string()) - .create(); + // let _m = mock( + // "GET", + // Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), + // ) + // .with_status(200) + // .with_body(test_json::SYNC.to_string()) + // .create(); - let dir = tempdir().unwrap(); - let path: &Path = dir.path(); - let store = Box::new(JsonStore::open(path).unwrap()); + // let dir = tempdir().unwrap(); + // let path: &Path = dir.path(); + // let store = Box::new(JsonStore::open(path).unwrap()); - let config = ClientConfig::default().state_store(store); - let client = Client::new_with_config(homeserver.clone(), config).unwrap(); - client.restore_login(session.clone()).await.unwrap(); + // let config = ClientConfig::default().state_store(store); + // let client = Client::new_with_config(homeserver.clone(), config).unwrap(); + // client.restore_login(session.clone()).await.unwrap(); - let room = client.get_joined_room(&room_id).await; - assert!(room.is_none()); + // let room = client.get_joined_room(&room_id).await; + // assert!(room.is_none()); - client.sync_once(SyncSettings::default()).await.unwrap(); + // client.sync_once(SyncSettings::default()).await.unwrap(); - let room = client.get_left_room(&room_id).await; - assert!(room.is_none()); + // let room = client.get_left_room(&room_id).await; + // assert!(room.is_none()); - let room = client.get_joined_room(&room_id).await; - assert!(room.is_some()); + // let room = client.get_joined_room(&room_id).await; + // assert!(room.is_some()); - // test store reloads with correct room state from JsonStore - let store = Box::new(JsonStore::open(path).unwrap()); - let config = ClientConfig::default().state_store(store); - let joined_client = Client::new_with_config(homeserver, config).unwrap(); - joined_client.restore_login(session).await.unwrap(); + // // test store reloads with correct room state from JsonStore + // let store = Box::new(JsonStore::open(path).unwrap()); + // let config = ClientConfig::default().state_store(store); + // let joined_client = Client::new_with_config(homeserver, config).unwrap(); + // joined_client.restore_login(session).await.unwrap(); - // joined room reloaded from state store - joined_client - .sync_once(SyncSettings::default()) - .await - .unwrap(); - let room = joined_client.get_joined_room(&room_id).await; - assert!(room.is_some()); + // // joined room reloaded from state store + // joined_client + // .sync_once(SyncSettings::default()) + // .await + // .unwrap(); + // let room = joined_client.get_joined_room(&room_id).await; + // assert!(room.is_some()); - let _m = mock( - "GET", - Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), - ) - .with_status(200) - .with_body(test_json::LEAVE_SYNC_EVENT.to_string()) - .create(); + // let _m = mock( + // "GET", + // Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), + // ) + // .with_status(200) + // .with_body(test_json::LEAVE_SYNC_EVENT.to_string()) + // .create(); - joined_client - .sync_once(SyncSettings::default()) - .await - .unwrap(); + // joined_client + // .sync_once(SyncSettings::default()) + // .await + // .unwrap(); - let room = joined_client.get_joined_room(&room_id).await; - assert!(room.is_none()); + // let room = joined_client.get_joined_room(&room_id).await; + // assert!(room.is_none()); - let room = joined_client.get_left_room(&room_id).await; - assert!(room.is_some()); - } + // let room = joined_client.get_left_room(&room_id).await; + // assert!(room.is_some()); + // } #[tokio::test] async fn account_data() { @@ -2649,62 +2632,62 @@ mod test { .is_some()) } - #[tokio::test] - async fn test_client_sync_store() { - let homeserver = url::Url::from_str(&mockito::server_url()).unwrap(); + // #[tokio::test] + // async fn test_client_sync_store() { + // let homeserver = url::Url::from_str(&mockito::server_url()).unwrap(); - let session = Session { - access_token: "1234".to_owned(), - user_id: user_id!("@cheeky_monkey:matrix.org"), - device_id: "DEVICEID".into(), - }; + // let session = Session { + // access_token: "1234".to_owned(), + // user_id: user_id!("@cheeky_monkey:matrix.org"), + // device_id: "DEVICEID".into(), + // }; - let _m = mock( - "GET", - Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), - ) - .with_status(200) - .with_body(test_json::SYNC.to_string()) - .create(); + // let _m = mock( + // "GET", + // Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), + // ) + // .with_status(200) + // .with_body(test_json::SYNC.to_string()) + // .create(); - let _m = mock("POST", "/_matrix/client/r0/login") - .with_status(200) - .with_body(test_json::LOGIN.to_string()) - .create(); + // let _m = mock("POST", "/_matrix/client/r0/login") + // .with_status(200) + // .with_body(test_json::LOGIN.to_string()) + // .create(); - let dir = tempdir().unwrap(); - // a sync response to populate our JSON store - let config = - ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); - let client = Client::new_with_config(homeserver.clone(), config).unwrap(); - client.restore_login(session.clone()).await.unwrap(); - let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); + // let dir = tempdir().unwrap(); + // // a sync response to populate our JSON store + // let config = + // ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); + // let client = Client::new_with_config(homeserver.clone(), config).unwrap(); + // client.restore_login(session.clone()).await.unwrap(); + // let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); - // gather state to save to the db, the first time through loading will be skipped - let _ = client.sync_once(sync_settings.clone()).await.unwrap(); + // // gather state to save to the db, the first time through loading will be skipped + // let _ = client.sync_once(sync_settings.clone()).await.unwrap(); - // now syncing the client will update from the state store - let config = - ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); - let client = Client::new_with_config(homeserver, config).unwrap(); - client.restore_login(session.clone()).await.unwrap(); - client.sync_once(sync_settings).await.unwrap(); + // // now syncing the client will update from the state store + // let config = + // ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); + // let client = Client::new_with_config(homeserver, config).unwrap(); + // client.restore_login(session.clone()).await.unwrap(); + // client.sync_once(sync_settings).await.unwrap(); - let base_client = &client.base_client; + // let base_client = &client.base_client; - // assert the synced client and the logged in client are equal - assert_eq!(*base_client.session().read().await, Some(session)); - assert_eq!( - base_client.sync_token().await, - Some("s526_47314_0_7_1_1_1_11444_1".to_string()) - ); + // // assert the synced client and the logged in client are equal + // assert_eq!(*base_client.session().read().await, Some(session)); + // assert_eq!( + // base_client.sync_token().await, + // Some("s526_47314_0_7_1_1_1_11444_1".to_string()) + // ); - // This is commented out because this field is private... - // assert_eq!( - // *base_client.ignored_users.read().await, - // vec![user_id!("@someone:example.org")] - // ); - } + // // This is commented out because this field is private... + // // assert_eq!( + // // *base_client.ignored_users.read().await, + // // vec![user_id!("@someone:example.org")] + // // ); + // } #[tokio::test] async fn sync() { diff --git a/matrix_sdk/src/lib.rs b/matrix_sdk/src/lib.rs index 3cb65f27..d2d77733 100644 --- a/matrix_sdk/src/lib.rs +++ b/matrix_sdk/src/lib.rs @@ -66,16 +66,7 @@ compile_error!("only one of 'native-tls' or 'rustls-tls' features can be enabled #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub use matrix_sdk_base::crypto::LocalTrust; -#[cfg(not(target_arch = "wasm32"))] -pub use matrix_sdk_base::JsonStore; -pub use matrix_sdk_base::{ - CustomEvent, Error as BaseError, EventEmitter, Room, RoomMember, RoomState, Session, - StateStore, SyncRoom, -}; - -#[cfg(feature = "messages")] -#[cfg_attr(feature = "docs", doc(cfg(messages)))] -pub use matrix_sdk_base::{MessageQueue, PossiblyRedactedExt}; +pub use matrix_sdk_base::{Error as BaseError, RoomSummary, Session}; pub use matrix_sdk_common::*; pub use reqwest; diff --git a/matrix_sdk_base/Cargo.toml b/matrix_sdk_base/Cargo.toml index 1e64495e..99033cf6 100644 --- a/matrix_sdk_base/Cargo.toml +++ b/matrix_sdk_base/Cargo.toml @@ -26,6 +26,7 @@ docs = ["encryption", "sqlite_cryptostore", "messages"] [dependencies] async-trait = "0.1.41" serde = "1.0.116" +dashmap= "*" serde_json = "1.0.58" zeroize = "1.1.1" tracing = "0.1.21" @@ -37,6 +38,7 @@ matrix-sdk-crypto = { version = "0.1.0", path = "../matrix_sdk_crypto", optional # Misc dependencies thiserror = "1.0.21" sled = "0.34.4" +futures = "0.3.6" [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] version = "0.2.22" @@ -44,7 +46,6 @@ default-features = false features = ["sync", "fs"] [dev-dependencies] -futures = "0.3.6" matrix-sdk-test = { version = "0.1.0", path = "../matrix_sdk_test" } http = "0.2.1" tracing-subscriber = "0.2.13" diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index b0963563..7a704262 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -21,6 +21,8 @@ use std::{ sync::Arc, }; +use dashmap::DashMap; + #[cfg(feature = "encryption")] use matrix_sdk_common::locks::Mutex; use matrix_sdk_common::{ @@ -43,15 +45,16 @@ use matrix_sdk_common::{ #[cfg(feature = "encryption")] use matrix_sdk_crypto::{ store::{CryptoStore, CryptoStoreError}, - Device, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas, ToDeviceRequest, - UserDevices, + Device, EncryptionSettings, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas, + ToDeviceRequest, UserDevices, }; +use tracing::info; use zeroize::Zeroizing; use crate::{ error::Result, session::Session, - store::{StateChanges, Store}, + store::{RoomSummary, StateChanges, Store}, }; pub type Token = String; @@ -180,6 +183,7 @@ pub struct BaseClient { pub(crate) sync_token: Arc>>, /// Database store: Store, + joined_rooms: Arc>, #[cfg(feature = "encryption")] olm: Arc>>, #[cfg(feature = "encryption")] @@ -282,10 +286,18 @@ impl BaseClient { /// * `config` - An optional session if the user already has one from a /// previous login call. pub fn new_with_config(config: BaseClientConfig) -> Result { + let store = if let Some(path) = &config.store_path { + info!("Opening store in path {}", path.display()); + Store::open_with_path(path) + } else { + Store::open() + }; + Ok(BaseClient { session: Arc::new(RwLock::new(None)), sync_token: Arc::new(RwLock::new(None)), - store: Store::open(), + store, + joined_rooms: Arc::new(DashMap::new()), #[cfg(feature = "encryption")] olm: Arc::new(Mutex::new(None)), #[cfg(feature = "encryption")] @@ -429,27 +441,45 @@ impl BaseClient { } } - let changes = StateChanges::default(); + let mut changes = StateChanges::default(); for (room_id, room) in &response.rooms.join { for e in &room.state.events { + // info!("Handling event for room {} {:#?}", room_id, e); if let Ok(event) = hoist_and_deserialize_state_event(e) { match event { AnySyncStateEvent::RoomMember(member) => { - let member_id = UserId::try_from(member.state_key).unwrap(); - let prev_member = - self.store.get_member_event(room_id, &member_id).await; + // let member_id = UserId::try_from(member.state_key.clone()).unwrap(); + // self.store.get_member_event(room_id, &member_id).await; use matrix_sdk_common::events::room::member::MembershipState::*; - match member.content.membership { + // TODO this isn't right, check the diff against + // your previous state. + match &member.content.membership { Join => { + info!("ADDING MEMBER {} to {}", member.state_key, room_id); + changes.add_joined_member(room_id, member) // TODO check if the display name is // ambigous } + Invited => { + info!( + "ADDING INVITED MEMBER {} to {}", + member.state_key, room_id + ); + changes.add_invited_member(room_id, member) + } _ => (), } } - _ => (), + AnySyncStateEvent::RoomCreate(create) => { + info!("Creating new room {}", room_id); + let room = + RoomSummary::new(self.store.clone(), room_id, &create.content); + self.joined_rooms.insert(room_id.clone(), room.clone()); + changes.add_room_summary(room_id.clone(), room); + } + _ => changes.add_state_event(room_id, event), } } } @@ -542,28 +572,27 @@ impl BaseClient { #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub async fn share_group_session(&self, room_id: &RoomId) -> Result>> { - todo!() - // let room = self.get_joined_room(room_id).await.expect("No room found"); - // let olm = self.olm.lock().await; + let olm = self.olm.lock().await; - // match &*olm { - // Some(o) => { - // let room = room.write().await; + match &*olm { + Some(o) => { + // XXX: We construct members in a slightly roundabout way instead of chaining the + // iterators directly because of https://github.com/rust-lang/rust/issues/64552 + // let joined_members = room.joined_members.keys(); + // let invited_members = room.joined_members.keys(); + // let members: Vec<&UserId> = joined_members.chain(invited_members).collect(); + let members = self.store.get_joined_members(room_id).await; + Ok( + o.share_group_session(room_id, members.iter(), EncryptionSettings::default()) + .await?, + ) + } + None => panic!("Olm machine wasn't started"), + } + } - // // XXX: We construct members in a slightly roundabout way instead of chaining the - // // iterators directly because of https://github.com/rust-lang/rust/issues/64552 - // let joined_members = room.joined_members.keys(); - // let invited_members = room.joined_members.keys(); - // let members: Vec<&UserId> = joined_members.chain(invited_members).collect(); - // Ok(o.share_group_session( - // room_id, - // members.into_iter(), - // room.encrypted.clone().unwrap_or_default(), - // ) - // .await?) - // } - // None => panic!("Olm machine wasn't started"), - // } + pub fn get_joined_room(&self, room_id: &RoomId) -> Option { + self.joined_rooms.get(room_id).map(|r| r.clone()) } /// Encrypt a message event content. diff --git a/matrix_sdk_base/src/lib.rs b/matrix_sdk_base/src/lib.rs index b503c28b..576a33f4 100644 --- a/matrix_sdk_base/src/lib.rs +++ b/matrix_sdk_base/src/lib.rs @@ -47,7 +47,7 @@ mod error; mod session; mod store; -pub use store::Store; +pub use store::{RoomSummary, Store}; pub use client::{BaseClient, BaseClientConfig, RoomState, RoomStateType}; diff --git a/matrix_sdk_base/src/store.rs b/matrix_sdk_base/src/store.rs index a1ac0b3a..0cf80737 100644 --- a/matrix_sdk_base/src/store.rs +++ b/matrix_sdk_base/src/store.rs @@ -1,9 +1,22 @@ -use std::{collections::BTreeMap, convert::TryFrom}; +use std::{ + collections::BTreeMap, + convert::TryFrom, + path::Path, + sync::{Arc, Mutex as SyncMutex}, +}; +use futures::executor::block_on; use matrix_sdk_common::{ - events::{room::member::MemberEventContent, AnySyncStateEvent, SyncStateEvent}, + events::{ + room::{ + create::CreateEventContent, encryption::EncryptionEventContent, + member::MemberEventContent, + }, + AnySyncStateEvent, SyncStateEvent, + }, identifiers::{RoomId, UserId}, }; +use serde::{Deserialize, Serialize}; use serde_json; use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree}; @@ -13,7 +26,9 @@ pub struct Store { inner: Db, session: Tree, members: Tree, + joined_user_ids: Tree, room_state: Tree, + room_summaries: Tree, } use crate::Session; @@ -23,9 +38,9 @@ pub struct StateChanges { session: Option, members: BTreeMap>>, state: BTreeMap>, - display_names: BTreeMap>>, - - added_user_ids: BTreeMap, + room_summaries: BTreeMap, + // display_names: BTreeMap>>, + joined_user_ids: BTreeMap, invited_user_ids: BTreeMap, removed_user_ids: BTreeMap, } @@ -37,12 +52,39 @@ impl StateChanges { event: SyncStateEvent, ) { let user_id = UserId::try_from(event.state_key.as_str()).unwrap(); + self.joined_user_ids + .insert(room_id.to_owned(), user_id.clone()); self.members .entry(room_id.to_owned()) .or_insert_with(BTreeMap::new) .insert(user_id, event); } + pub fn add_invited_member( + &mut self, + room_id: &RoomId, + event: SyncStateEvent, + ) { + let user_id = UserId::try_from(event.state_key.as_str()).unwrap(); + self.invited_user_ids + .insert(room_id.to_owned(), user_id.clone()); + self.members + .entry(room_id.to_owned()) + .or_insert_with(BTreeMap::new) + .insert(user_id, event); + } + + pub fn add_room_summary(&mut self, room_id: RoomId, summary: RoomSummary) { + self.room_summaries.insert(room_id, summary); + } + + pub fn add_state_event(&mut self, room_id: &RoomId, event: AnySyncStateEvent) { + self.state + .entry(room_id.to_owned()) + .or_insert_with(BTreeMap::new) + .insert(event.state_key().to_string(), event); + } + pub fn from_event(room_id: &RoomId, event: SyncStateEvent) -> Self { let mut changes = Self::default(); changes.add_joined_member(room_id, event); @@ -60,24 +102,136 @@ impl From for StateChanges { } } +#[derive(Debug, Clone)] +pub struct RoomSummary { + room_id: Arc, + inner: Arc>, + store: Store, +} + +impl RoomSummary { + pub fn new(store: Store, room_id: &RoomId, creation_event: &CreateEventContent) -> Self { + let room_id = Arc::new(room_id.clone()); + + Self { + room_id: room_id.clone(), + store, + inner: Arc::new(SyncMutex::new(InnerSummary { + creation_content: creation_event.clone(), + room_id, + encryption: None, + last_prev_batch: None, + })), + } + } + + fn serialize(&self) -> Vec { + let inner = self.inner.lock().unwrap(); + serde_json::to_vec(&*inner).unwrap() + } + + pub async fn joined_user_ids(&self) -> Vec { + self.store.get_joined_members(&self.room_id).await + } + + pub fn is_encrypted(&self) -> bool { + self.inner.lock().unwrap().encryption.is_some() + } + + pub fn get_member(&self, user_id: &UserId) -> Option { + block_on(self.store.get_member_event(&self.room_id, user_id)).map(|e| e.into()) + } + + pub fn room_id(&self) -> &RoomId { + &self.room_id + } + + pub fn display_name(&self) -> String { + "TEST ROOM NAME".to_string() + } +} + +#[derive(Debug)] +pub struct RoomMember { + event: Arc>, +} + +impl RoomMember { + pub fn display_name(&self) -> &Option { + &self.event.content.displayname + } + + pub fn disambiguated_name(&self) -> String { + self.event.state_key.clone() + } + + pub fn name(&self) -> String { + self.event.state_key.clone() + } + + pub fn unique_name(&self) -> String { + self.event.state_key.clone() + } +} + +impl From> for RoomMember { + fn from(event: SyncStateEvent) -> Self { + Self { + event: Arc::new(event), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct InnerSummary { + room_id: Arc, + creation_content: CreateEventContent, + encryption: Option, + last_prev_batch: Option, +} + impl Store { - pub fn open() -> Self { - let db = Config::new().temporary(true).open().unwrap(); + fn open_helper(db: Db) -> Self { let session = db.open_tree("session").unwrap(); + let members = db.open_tree("members").unwrap(); - let room_state = db.open_tree("members").unwrap(); + let joined_user_ids = db.open_tree("joined_user_ids").unwrap(); + + let room_state = db.open_tree("room_state").unwrap(); + let room_summaries = db.open_tree("room_summaries").unwrap(); Self { inner: db, session, members, + joined_user_ids, room_state, + room_summaries, } } + pub fn open() -> Self { + let db = Config::new().temporary(true).open().unwrap(); + + Store::open_helper(db) + } + + pub fn open_with_path(path: impl AsRef) -> Self { + let path = path.as_ref().join("matrix-sdk-state"); + let db = Config::new().temporary(false).path(path).open().unwrap(); + + Store::open_helper(db) + } + pub async fn save_changes(&self, changes: &StateChanges) { - let ret: TransactionResult<()> = - (&self.session, &self.members).transaction(|(session, members)| { + let ret: TransactionResult<()> = ( + &self.session, + &self.members, + &self.joined_user_ids, + &self.room_state, + &self.room_summaries, + ) + .transaction(|(session, members, joined, state, summaries)| { if let Some(s) = &changes.session { session.insert("session", serde_json::to_vec(s).unwrap())?; } @@ -91,6 +245,23 @@ impl Store { } } + for (room, user) in &changes.joined_user_ids { + joined.insert(room.as_bytes(), user.as_bytes())?; + } + + for (room, events) in &changes.state { + for (state_key, event) in events { + state.insert( + format!("{}{}", room.as_str(), state_key).as_bytes(), + serde_json::to_vec(&event).unwrap(), + )?; + } + } + + for (room_id, summary) in &changes.room_summaries { + summaries.insert(room_id.as_str().as_bytes(), summary.serialize())?; + } + Ok(()) }); @@ -110,6 +281,13 @@ impl Store { .map(|v| serde_json::from_slice(&v).unwrap()) } + pub async fn get_joined_members(&self, room_id: &RoomId) -> Vec { + self.joined_user_ids + .scan_prefix(room_id.as_bytes()) + .map(|u| UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap()) + .collect() + } + pub fn get_session(&self) -> Option { self.session .get("session")