From dc57873687b44f870d7048c568f88680eef75cbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Sun, 25 Oct 2020 21:03:03 +0100 Subject: [PATCH] base: WIP more work on the new state store. --- matrix_sdk/src/client.rs | 4 +- matrix_sdk/src/lib.rs | 2 +- matrix_sdk_base/src/client.rs | 161 ++++++++++++++++----------- matrix_sdk_base/src/lib.rs | 2 +- matrix_sdk_base/src/store.rs | 199 +++++++++++++++++++++++++++++++--- 5 files changed, 289 insertions(+), 79 deletions(-) diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 324025e0..4827fc05 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -40,7 +40,7 @@ use zeroize::Zeroizing; use tracing::{debug, warn}; use tracing::{error, info, instrument}; -use matrix_sdk_base::{BaseClient, BaseClientConfig, RoomSummary, Session}; +use matrix_sdk_base::{BaseClient, BaseClientConfig, Room, Session}; #[cfg(feature = "encryption")] use matrix_sdk_base::crypto::{ @@ -456,7 +456,7 @@ impl Client { /// # Arguments /// /// `room_id` - The unique id of the room that should be fetched. - pub fn get_joined_room(&self, room_id: &RoomId) -> Option { + pub fn get_joined_room(&self, room_id: &RoomId) -> Option { self.base_client.get_joined_room(room_id) } diff --git a/matrix_sdk/src/lib.rs b/matrix_sdk/src/lib.rs index d2d77733..019ecced 100644 --- a/matrix_sdk/src/lib.rs +++ b/matrix_sdk/src/lib.rs @@ -66,7 +66,7 @@ compile_error!("only one of 'native-tls' or 'rustls-tls' features can be enabled #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub use matrix_sdk_base::crypto::LocalTrust; -pub use matrix_sdk_base::{Error as BaseError, RoomSummary, Session}; +pub use matrix_sdk_base::{Error as BaseError, Room, Session}; pub use matrix_sdk_common::*; pub use reqwest; diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index 7a704262..8b86c6eb 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -29,7 +29,7 @@ use matrix_sdk_common::{ api::r0 as api, events::{ room::member::MemberEventContent, AnyStrippedStateEvent, AnySyncRoomEvent, - AnySyncStateEvent, SyncStateEvent, + AnySyncStateEvent, SyncStateEvent, AnySyncMessageEvent }, identifiers::{RoomId, UserId}, locks::RwLock, @@ -51,10 +51,11 @@ use matrix_sdk_crypto::{ use tracing::info; use zeroize::Zeroizing; +use crate::store::RoomType; use crate::{ error::Result, session::Session, - store::{RoomSummary, StateChanges, Store}, + store::{Room, StateChanges, Store}, }; pub type Token = String; @@ -77,7 +78,7 @@ pub struct AdditionalUnsignedData { pub prev_content: Option>, } -/// Transform room event by hoisting `prev_content` field from `unsigned` to the top level. +/// Transform state event by hoisting `prev_content` field from `unsigned` to the top level. /// /// Due to a [bug in synapse][synapse-bug], `prev_content` often ends up in `unsigned` contrary to /// the C2S spec. Some more discussion can be found [here][discussion]. Until this is fixed in @@ -85,32 +86,6 @@ pub struct AdditionalUnsignedData { /// /// [synapse-bug]: /// [discussion]: -fn hoist_room_event_prev_content(event: &Raw) -> Option> { - let prev_content = serde_json::from_str::(event.json().get()) - .map(|more_unsigned| more_unsigned.unsigned) - .map(|additional| additional.prev_content) - .ok() - .flatten()?; - - let mut ev = event.deserialize().ok()?; - - match &mut ev { - AnySyncRoomEvent::State(AnySyncStateEvent::RoomMember(ref mut member)) - if member.prev_content.is_none() => - { - if let Ok(prev) = prev_content.deserialize() { - member.prev_content = Some(prev) - } - - Some(Raw::from(ev)) - } - _ => None, - } -} - -/// Transform state event by hoisting `prev_content` field from `unsigned` to the top level. -/// -/// See comment of `hoist_room_event_prev_content`. fn hoist_and_deserialize_state_event( event: &Raw, ) -> StdResult { @@ -129,6 +104,25 @@ fn hoist_and_deserialize_state_event( Ok(ev) } +fn hoist_room_event_prev_content(event: &Raw) -> StdResult { + let prev_content = serde_json::from_str::(event.json().get()) + .map(|more_unsigned| more_unsigned.unsigned) + .map(|additional| additional.prev_content)?; + + let mut ev = event.deserialize()?; + + match &mut ev { + AnySyncRoomEvent::State(AnySyncStateEvent::RoomMember(ref mut member)) + if member.prev_content.is_none() => + { + member.prev_content = prev_content.map(|p| p.deserialize().ok()).flatten(); + } + _ => (), + } + + Ok(ev) +} + fn calculate_membership_change( existing_event: SyncStateEvent, new_event: SyncStateEvent, @@ -183,7 +177,7 @@ pub struct BaseClient { pub(crate) sync_token: Arc>>, /// Database store: Store, - joined_rooms: Arc>, + joined_rooms: Arc>, #[cfg(feature = "encryption")] olm: Arc>>, #[cfg(feature = "encryption")] @@ -410,6 +404,17 @@ impl BaseClient { self.sync_token.read().await.clone() } + fn get_or_create_room(&self, room_id: &RoomId, room_type: RoomType) -> Room { + match room_type { + RoomType::Joined => self + .joined_rooms + .entry(room_id.clone()) + .or_insert_with(|| Room::new(self.store.clone(), room_id, room_type)) + .clone(), + _ => todo!(), + } + } + /// Receive a response from a sync call. /// /// # Arguments @@ -426,8 +431,6 @@ impl BaseClient { return Ok(()); } - *self.sync_token.write().await = Some(response.next_batch.clone()); - #[cfg(feature = "encryption")] { let olm = self.olm.lock().await; @@ -437,55 +440,89 @@ impl BaseClient { // decryptes to-device events, but leaves room events alone. // This makes sure that we have the deryption keys for the room // events at hand. - o.receive_sync_response(response).await; + o.receive_sync_response(response).await?; } } let mut changes = StateChanges::default(); - for (room_id, room) in &response.rooms.join { - for e in &room.state.events { - // info!("Handling event for room {} {:#?}", room_id, e); + let handle_membership = + |changes: &mut StateChanges, room_id, event: SyncStateEvent| { + // let member_id = UserId::try_from(member.state_key.clone()).unwrap(); + // self.store.get_member_event(room_id, &member_id).await; + + // TODO this isn't right, check the diff against + // your previous state. + use matrix_sdk_common::events::room::member::MembershipState::*; + match &event.content.membership { + Join => { + info!("ADDING MEMBER {} to {}", event.state_key, room_id); + changes.add_joined_member(room_id, event) + // TODO check if the display name is + // ambigous + } + Invite => { + info!("ADDING INVITED MEMBER {} to {}", event.state_key, room_id); + changes.add_invited_member(room_id, event) + } + _ => (), + } + }; + + for (room_id, room_info) in &response.rooms.join { + let room = self.get_or_create_room(room_id, RoomType::Joined); + + if room.update_summary(&room_info.summary) { + changes.add_room(room.clone()) + } + + for e in &room_info.state.events { if let Ok(event) = hoist_and_deserialize_state_event(e) { match event { AnySyncStateEvent::RoomMember(member) => { - // let member_id = UserId::try_from(member.state_key.clone()).unwrap(); - // self.store.get_member_event(room_id, &member_id).await; - use matrix_sdk_common::events::room::member::MembershipState::*; + handle_membership(&mut changes, room_id, member); + } + _ => { + if room.handle_state_event(&event) { + changes.add_room(room.clone()); + } + changes.add_state_event(room_id, event); + } + } + } + } - // TODO this isn't right, check the diff against - // your previous state. - match &member.content.membership { - Join => { - info!("ADDING MEMBER {} to {}", member.state_key, room_id); - changes.add_joined_member(room_id, member) - // TODO check if the display name is - // ambigous + if room.set_prev_batch(room_info.timeline.prev_batch.clone()) { + changes.add_room(room.clone()); + } + + for event in &room_info.timeline.events { + if let Ok(e) = hoist_room_event_prev_content(event) { + match e { + AnySyncRoomEvent::State(s) => { + match s { + AnySyncStateEvent::RoomMember(member) => { + handle_membership(&mut changes, room_id, member); } - Invited => { - info!( - "ADDING INVITED MEMBER {} to {}", - member.state_key, room_id - ); - changes.add_invited_member(room_id, member) + _ => { + if room.handle_state_event(&s) { + changes.add_room(room.clone()); + } + changes.add_state_event(room_id, s); } - _ => (), } } - AnySyncStateEvent::RoomCreate(create) => { - info!("Creating new room {}", room_id); - let room = - RoomSummary::new(self.store.clone(), room_id, &create.content); - self.joined_rooms.insert(room_id.clone(), room.clone()); - changes.add_room_summary(room_id.clone(), room); + AnySyncRoomEvent::Message(_) => { + // TODO decrypt the event if it's an encrypted one. } - _ => changes.add_state_event(room_id, event), + _ => (), } } } } self.store.save_changes(&changes).await; + *self.sync_token.write().await = Some(response.next_batch.clone()); Ok(()) } @@ -591,7 +628,7 @@ impl BaseClient { } } - pub fn get_joined_room(&self, room_id: &RoomId) -> Option { + pub fn get_joined_room(&self, room_id: &RoomId) -> Option { self.joined_rooms.get(room_id).map(|r| r.clone()) } diff --git a/matrix_sdk_base/src/lib.rs b/matrix_sdk_base/src/lib.rs index 576a33f4..64e1877b 100644 --- a/matrix_sdk_base/src/lib.rs +++ b/matrix_sdk_base/src/lib.rs @@ -47,7 +47,7 @@ mod error; mod session; mod store; -pub use store::{RoomSummary, Store}; +pub use store::{Room, Store}; pub use client::{BaseClient, BaseClientConfig, RoomState, RoomStateType}; diff --git a/matrix_sdk_base/src/store.rs b/matrix_sdk_base/src/store.rs index 0cf80737..997e2647 100644 --- a/matrix_sdk_base/src/store.rs +++ b/matrix_sdk_base/src/store.rs @@ -6,20 +6,25 @@ use std::{ }; use futures::executor::block_on; +use matrix_sdk_common::Raw; +use matrix_sdk_common::events::room::canonical_alias::CanonicalAliasEventContent; +use matrix_sdk_common::events::room::name::NameEventContent; +use matrix_sdk_common::identifiers::RoomAliasId; use matrix_sdk_common::{ + api::r0::sync::sync_events::RoomSummary as RumaSummary, events::{ room::{ create::CreateEventContent, encryption::EncryptionEventContent, member::MemberEventContent, }, - AnySyncStateEvent, SyncStateEvent, + AnySyncStateEvent, EventContent, SyncStateEvent, }, identifiers::{RoomId, UserId}, }; use serde::{Deserialize, Serialize}; -use serde_json; use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree}; +use tracing::info; #[derive(Debug, Clone)] pub struct Store { @@ -38,7 +43,7 @@ pub struct StateChanges { session: Option, members: BTreeMap>>, state: BTreeMap>, - room_summaries: BTreeMap, + room_summaries: BTreeMap, // display_names: BTreeMap>>, joined_user_ids: BTreeMap, invited_user_ids: BTreeMap, @@ -74,8 +79,8 @@ impl StateChanges { .insert(user_id, event); } - pub fn add_room_summary(&mut self, room_id: RoomId, summary: RoomSummary) { - self.room_summaries.insert(room_id, summary); + pub fn add_room(&mut self, room: Room) { + self.room_summaries.insert(room.room_id().to_owned(), room); } pub fn add_state_event(&mut self, room_id: &RoomId, event: AnySyncStateEvent) { @@ -103,28 +108,73 @@ impl From for StateChanges { } #[derive(Debug, Clone)] -pub struct RoomSummary { +pub struct Room { room_id: Arc, inner: Arc>, store: Store, } -impl RoomSummary { - pub fn new(store: Store, room_id: &RoomId, creation_event: &CreateEventContent) -> Self { +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct SomeSummary { + heroes: Vec, + joined_member_count: u64, + invited_member_count: u64, +} + +/// Signals to the `BaseClient` which `RoomState` to send to `EventEmitter`. +#[derive(Debug, Serialize, Deserialize)] +pub enum RoomType { + /// Represents a joined room, the `joined_rooms` HashMap will be used. + Joined, + /// Represents a left room, the `left_rooms` HashMap will be used. + Left, + /// Represents an invited room, the `invited_rooms` HashMap will be used. + Invited, +} + +impl Room { + pub fn new(store: Store, room_id: &RoomId, room_type: RoomType) -> Self { let room_id = Arc::new(room_id.clone()); Self { room_id: room_id.clone(), store, inner: Arc::new(SyncMutex::new(InnerSummary { - creation_content: creation_event.clone(), room_id, + room_type, encryption: None, + summary: Default::default(), last_prev_batch: None, + name: None, + canonical_alias: None, + avatar_url: None, })), } } + pub fn handle_state_events(&self, state_events: &[Raw]) -> InnerSummary { + todo!(); + } + + pub fn handle_state_event(&self, event: &AnySyncStateEvent) -> bool { + match event { + AnySyncStateEvent::RoomEncryption(encryption) => { + info!("MARKING ROOM {} AS ENCRYPTED", self.room_id); + self.mark_as_encrypted(encryption); + true + } + AnySyncStateEvent::RoomName(n) => { + self.set_name(&n); + true + } + AnySyncStateEvent::RoomCanonicalAlias(a) => { + self.set_canonical_alias(&a); + true + } + _ => false, + } + } + fn serialize(&self) -> Vec { let inner = self.inner.lock().unwrap(); serde_json::to_vec(&*inner).unwrap() @@ -138,6 +188,56 @@ impl RoomSummary { self.inner.lock().unwrap().encryption.is_some() } + pub fn mark_as_encrypted(&self, event: &SyncStateEvent) { + self.inner.lock().unwrap().encryption = Some(event.content.clone()); + } + + pub fn set_name(&self, event: &SyncStateEvent) { + self.inner.lock().unwrap().name = event.content.name().map(|n| n.to_string()); + } + + pub fn set_canonical_alias(&self, event: &SyncStateEvent) { + self.inner.lock().unwrap().canonical_alias = event.content.alias.clone(); + } + + pub fn set_prev_batch(&self, prev_batch: Option) -> bool { + let mut inner = self.inner.lock().unwrap(); + + if inner.last_prev_batch != prev_batch { + inner.last_prev_batch = prev_batch; + true + } else { + false + } + } + + pub fn update_summary(&self, summary: &RumaSummary) -> bool { + let mut inner = self.inner.lock().unwrap(); + + let mut changed = false; + + info!("UPDAGING SUMMARY FOR {} WITH {:#?}", self.room_id, summary); + + if !summary.is_empty() { + if !summary.heroes.is_empty() { + inner.summary.heroes = summary.heroes.clone(); + changed = true; + } + + if let Some(joined) = summary.joined_member_count { + inner.summary.joined_member_count = joined.into(); + changed = true; + } + + if let Some(invited) = summary.invited_member_count { + inner.summary.invited_member_count = invited.into(); + changed = true; + } + } + + changed + } + pub fn get_member(&self, user_id: &UserId) -> Option { block_on(self.store.get_member_event(&self.room_id, user_id)).map(|e| e.into()) } @@ -147,7 +247,7 @@ impl RoomSummary { } pub fn display_name(&self) -> String { - "TEST ROOM NAME".to_string() + self.inner.lock().unwrap().calculate_name() } } @@ -183,13 +283,79 @@ impl From> for RoomMember { } #[derive(Debug, Serialize, Deserialize)] -struct InnerSummary { +pub struct InnerSummary { room_id: Arc, - creation_content: CreateEventContent, + room_type: RoomType, + + name: Option, + canonical_alias: Option, + avatar_url: Option, + + summary: SomeSummary, + encryption: Option, last_prev_batch: Option, } +impl InnerSummary { + /// Calculate the canonical display name of the room, taking into account + /// its name, aliases and members. + /// + /// The display name is calculated according to [this algorithm][spec]. + /// + /// [spec]: + /// + pub fn calculate_name(&self) -> String { + if let Some(name) = &self.name { + let name = name.trim(); + name.to_string() + } else if let Some(alias) = &self.canonical_alias { + let alias = alias.alias().trim(); + alias.to_string() + } else { + // let joined = self.joined_member_count.unwrap_or_else(|| uint!(0)); + // let invited = self.invited_member_count.unwrap_or_else(|| uint!(0)); + // let heroes_count = self.summary.heroes.len(); + // let invited_joined = (invited + joined).saturating_sub(uint!(1)); + + // let members = joined_members.values().chain(invited_members.values()); + + // // TODO: This should use `self.heroes` but it is always empty?? + // if heroes >= invited_joined { + // let mut names = members + // .filter(|m| m.user_id != *own_user_id) + // .take(3) + // .map(|mem| { + // mem.display_name + // .clone() + // .unwrap_or_else(|| mem.user_id.localpart().to_string()) + // }) + // .collect::>(); + // // stabilize ordering + // names.sort(); + // names.join(", ") + // } else if heroes < invited_joined && invited + joined > uint!(1) { + // let mut names = members + // .filter(|m| m.user_id != *own_user_id) + // .take(3) + // .map(|mem| { + // mem.display_name + // .clone() + // .unwrap_or_else(|| mem.user_id.localpart().to_string()) + // }) + // .collect::>(); + // names.sort(); + + // // TODO: What length does the spec want us to use here and in + // // the `else`? + // format!("{}, and {} others", names.join(", "), (joined + invited)) + // } else { + "Empty room".to_string() + // } + } + } +} + impl Store { fn open_helper(db: Db) -> Self { let session = db.open_tree("session").unwrap(); @@ -224,6 +390,7 @@ impl Store { } pub async fn save_changes(&self, changes: &StateChanges) { + info!("SAVING CHANGES OF SIZE {}", std::mem::size_of_val(changes)); let ret: TransactionResult<()> = ( &self.session, &self.members, @@ -252,7 +419,13 @@ impl Store { for (room, events) in &changes.state { for (state_key, event) in events { state.insert( - format!("{}{}", room.as_str(), state_key).as_bytes(), + format!( + "{}{}{}", + room.as_str(), + event.content().event_type(), + state_key + ) + .as_bytes(), serde_json::to_vec(&event).unwrap(), )?; }