diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index 608f6c57..929802d7 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -30,6 +30,7 @@ docs = ["encryption", "sqlite_cryptostore", "messages"] [dependencies] async-trait = "0.1.41" dashmap = { version = "3.11.10", optional = true } +futures = "0.3.6" http = "0.2.1" serde_json = "1.0.59" thiserror = "1.0.21" @@ -78,7 +79,6 @@ tracing-subscriber = "0.2.13" tempfile = "3.1.0" mockito = "0.27.0" lazy_static = "1.4.0" -futures = "0.3.6" [[example]] name = "emoji_verification" diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 4f04a4bc..49a8d859 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -27,6 +27,7 @@ use std::{ #[cfg(feature = "encryption")] use dashmap::DashMap; +use futures::StreamExt; use futures_timer::Delay as sleep; use http::HeaderValue; use mime::{self, Mime}; @@ -1010,8 +1011,9 @@ impl Client { { let room = self.base_client.get_joined_room(room_id).unwrap(); let members = room.joined_user_ids().await; - let mut members_iter = members.iter(); - self.claim_one_time_keys(&mut members_iter).await?; + // TODO don't collect here. + let members_iter: Vec = members.collect().await; + self.claim_one_time_keys(&mut members_iter.iter()).await?; }; let response = self.share_group_session(room_id).await; diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index 50942670..c3942620 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -22,6 +22,7 @@ use std::{ use dashmap::DashMap; +use futures::StreamExt; #[cfg(feature = "encryption")] use matrix_sdk_common::locks::Mutex; use matrix_sdk_common::{ @@ -512,6 +513,19 @@ impl BaseClient { } } + #[cfg(feature = "encryption")] + if summary.is_encrypted() { + if let Some(o) = self.olm_machine().await { + if let Some(users) = changes.joined_user_ids.get(room_id) { + o.update_tracked_users(users).await + } + + if let Some(users) = changes.invited_user_ids.get(room_id) { + o.update_tracked_users(users).await + } + } + } + changes.add_room(summary); } @@ -615,12 +629,10 @@ impl BaseClient { match &*olm { Some(o) => { - // XXX: We construct members in a slightly roundabout way instead of chaining the - // iterators directly because of https://github.com/rust-lang/rust/issues/64552 - // let joined_members = room.joined_members.keys(); - // let invited_members = room.joined_members.keys(); - // let members: Vec<&UserId> = joined_members.chain(invited_members).collect(); - let members = self.store.get_joined_user_ids(room_id).await; + let joined = self.store.get_joined_user_ids(room_id).await; + let invited = self.store.get_invited_user_ids(room_id).await; + // TODO don't use collect here. + let members: Vec = joined.chain(invited).collect().await; Ok( o.share_group_session(room_id, members.iter(), EncryptionSettings::default()) .await?, diff --git a/matrix_sdk_base/src/store.rs b/matrix_sdk_base/src/store.rs index 36697d96..38cce930 100644 --- a/matrix_sdk_base/src/store.rs +++ b/matrix_sdk_base/src/store.rs @@ -5,7 +5,11 @@ use std::{ sync::{Arc, Mutex as SyncMutex}, }; -use futures::executor::block_on; +use futures::{ + executor::block_on, + future, + stream::{self, Stream, StreamExt}, +}; use matrix_sdk_common::{ api::r0::sync::sync_events::RoomSummary as RumaSummary, events::{ @@ -40,8 +44,8 @@ pub struct StateChanges { state: BTreeMap>, pub room_summaries: BTreeMap, // display_names: BTreeMap>>, - joined_user_ids: BTreeMap>, - invited_user_ids: BTreeMap>, + pub joined_user_ids: BTreeMap>, + pub invited_user_ids: BTreeMap>, removed_user_ids: BTreeMap, } @@ -154,20 +158,18 @@ impl Room { } } - pub async fn get_j_members(&self) -> Vec { + pub async fn get_j_members(&self) -> impl Stream + '_ { let joined = self.store.get_joined_user_ids(self.room_id()).await; let invited = self.store.get_invited_user_ids(self.room_id()).await; - let mut members = Vec::new(); - - for user in joined.into_iter().chain(invited) { + let x = move |u| async move { self.store - .get_member_event(self.room_id(), &user) + .get_member_event(self.room_id(), &u) .await - .map(|e| members.push(e.into())); - } + .map(|m| m.into()) + }; - members + joined.chain(invited).filter_map(x) } /// Calculate the canonical display name of the room, taking into account @@ -194,38 +196,43 @@ impl Room { let members = self.get_j_members().await; - info!( - "Calculating name for {}, hero count {} members {:#?}", - self.room_id(), - heroes_count, - members - ); + // info!( + // "Calculating name for {}, hero count {} members {:#?}", + // self.room_id(), + // heroes_count, + // members + // ); // TODO: This should use `self.heroes` but it is always empty?? + // + let own_user_id = self.own_user_id.clone(); + + let is_own_member = |m: &RoomMember| &m.user_id() == &*own_user_id; + if heroes_count >= invited_joined { let mut names = members - .iter() - .filter(|m| &m.user_id() != &*self.own_user_id) + .filter(|m| future::ready(is_own_member(m))) .take(3) .map(|mem| { mem.display_name() .clone() .unwrap_or_else(|| mem.user_id().localpart().to_string()) }) - .collect::>(); + .collect::>() + .await; // stabilize ordering names.sort(); names.join(", ") } else if heroes_count < invited_joined && invited + joined > 1 { let mut names = members - .iter() - .filter(|m| &m.user_id() != &*self.own_user_id) + .filter(|m| future::ready(is_own_member(m))) .take(3) .map(|mem| { mem.display_name() .clone() .unwrap_or_else(|| mem.user_id().localpart().to_string()) }) - .collect::>(); + .collect::>() + .await; names.sort(); // TODO: What length does the spec want us to use here and in @@ -241,7 +248,7 @@ impl Room { (*self.inner.lock().unwrap()).clone() } - pub async fn joined_user_ids(&self) -> Vec { + pub async fn joined_user_ids(&self) -> impl Stream { self.store.get_joined_user_ids(&self.room_id).await } @@ -251,7 +258,6 @@ impl Room { pub fn update_summary(&self, summary: InnerSummary) { let mut inner = self.inner.lock().unwrap(); - info!("UPDAGING SUMMARY FOR {} WITH {:#?}", self.room_id, summary); *inner = summary; } @@ -263,6 +269,10 @@ impl Room { &self.room_id } + pub fn last_prev_batch(&self) -> Option { + self.inner.lock().unwrap().last_prev_batch.clone() + } + pub async fn display_name(&self) -> String { self.calculate_name().await } @@ -359,6 +369,10 @@ impl InnerSummary { } } + pub fn is_encrypted(&self) -> bool { + self.encryption.is_some() + } + pub(crate) fn update(&mut self, summary: &RumaSummary) -> bool { let mut changed = false; @@ -425,7 +439,6 @@ impl Store { } pub async fn save_changes(&self, changes: &StateChanges) { - info!("SAVING CHANGES OF SIZE {}", std::mem::size_of_val(changes)); let ret: TransactionResult<()> = ( &self.session, &self.members, @@ -504,18 +517,24 @@ impl Store { .map(|v| serde_json::from_slice(&v).unwrap()) } - pub async fn get_invited_user_ids(&self, room_id: &RoomId) -> Vec { - self.invited_user_ids - .scan_prefix(room_id.as_bytes()) - .map(|u| UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap()) - .collect() + pub async fn get_invited_user_ids(&self, room_id: &RoomId) -> impl Stream { + stream::iter( + self.invited_user_ids + .scan_prefix(room_id.as_bytes()) + .map(|u| { + UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap() + }), + ) } - pub async fn get_joined_user_ids(&self, room_id: &RoomId) -> Vec { - self.joined_user_ids - .scan_prefix(room_id.as_bytes()) - .map(|u| UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap()) - .collect() + pub async fn get_joined_user_ids(&self, room_id: &RoomId) -> impl Stream { + stream::iter( + self.joined_user_ids + .scan_prefix(room_id.as_bytes()) + .map(|u| { + UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap() + }), + ) } pub fn get_session(&self) -> Option {