store: Use streams so we don't load all members at once.

This commit is contained in:
Damir Jelić 2020-11-16 18:11:12 +01:00
parent 8ed8929788
commit 3da1d3cf8f
4 changed files with 78 additions and 45 deletions

View file

@ -30,6 +30,7 @@ docs = ["encryption", "sqlite_cryptostore", "messages"]
[dependencies]
async-trait = "0.1.41"
dashmap = { version = "3.11.10", optional = true }
futures = "0.3.6"
http = "0.2.1"
serde_json = "1.0.59"
thiserror = "1.0.21"
@ -78,7 +79,6 @@ tracing-subscriber = "0.2.13"
tempfile = "3.1.0"
mockito = "0.27.0"
lazy_static = "1.4.0"
futures = "0.3.6"
[[example]]
name = "emoji_verification"

View file

@ -27,6 +27,7 @@ use std::{
#[cfg(feature = "encryption")]
use dashmap::DashMap;
use futures::StreamExt;
use futures_timer::Delay as sleep;
use http::HeaderValue;
use mime::{self, Mime};
@ -1010,8 +1011,9 @@ impl Client {
{
let room = self.base_client.get_joined_room(room_id).unwrap();
let members = room.joined_user_ids().await;
let mut members_iter = members.iter();
self.claim_one_time_keys(&mut members_iter).await?;
// TODO don't collect here.
let members_iter: Vec<UserId> = members.collect().await;
self.claim_one_time_keys(&mut members_iter.iter()).await?;
};
let response = self.share_group_session(room_id).await;

View file

@ -22,6 +22,7 @@ use std::{
use dashmap::DashMap;
use futures::StreamExt;
#[cfg(feature = "encryption")]
use matrix_sdk_common::locks::Mutex;
use matrix_sdk_common::{
@ -512,6 +513,19 @@ impl BaseClient {
}
}
#[cfg(feature = "encryption")]
if summary.is_encrypted() {
if let Some(o) = self.olm_machine().await {
if let Some(users) = changes.joined_user_ids.get(room_id) {
o.update_tracked_users(users).await
}
if let Some(users) = changes.invited_user_ids.get(room_id) {
o.update_tracked_users(users).await
}
}
}
changes.add_room(summary);
}
@ -615,12 +629,10 @@ impl BaseClient {
match &*olm {
Some(o) => {
// XXX: We construct members in a slightly roundabout way instead of chaining the
// iterators directly because of https://github.com/rust-lang/rust/issues/64552
// let joined_members = room.joined_members.keys();
// let invited_members = room.joined_members.keys();
// let members: Vec<&UserId> = joined_members.chain(invited_members).collect();
let members = self.store.get_joined_user_ids(room_id).await;
let joined = self.store.get_joined_user_ids(room_id).await;
let invited = self.store.get_invited_user_ids(room_id).await;
// TODO don't use collect here.
let members: Vec<UserId> = joined.chain(invited).collect().await;
Ok(
o.share_group_session(room_id, members.iter(), EncryptionSettings::default())
.await?,

View file

@ -5,7 +5,11 @@ use std::{
sync::{Arc, Mutex as SyncMutex},
};
use futures::executor::block_on;
use futures::{
executor::block_on,
future,
stream::{self, Stream, StreamExt},
};
use matrix_sdk_common::{
api::r0::sync::sync_events::RoomSummary as RumaSummary,
events::{
@ -40,8 +44,8 @@ pub struct StateChanges {
state: BTreeMap<RoomId, BTreeMap<String, AnySyncStateEvent>>,
pub room_summaries: BTreeMap<RoomId, InnerSummary>,
// display_names: BTreeMap<RoomId, BTreeMap<String, BTreeMap<UserId, ()>>>,
joined_user_ids: BTreeMap<RoomId, Vec<UserId>>,
invited_user_ids: BTreeMap<RoomId, Vec<UserId>>,
pub joined_user_ids: BTreeMap<RoomId, Vec<UserId>>,
pub invited_user_ids: BTreeMap<RoomId, Vec<UserId>>,
removed_user_ids: BTreeMap<RoomId, UserId>,
}
@ -154,20 +158,18 @@ impl Room {
}
}
pub async fn get_j_members(&self) -> Vec<RoomMember> {
pub async fn get_j_members(&self) -> impl Stream<Item = RoomMember> + '_ {
let joined = self.store.get_joined_user_ids(self.room_id()).await;
let invited = self.store.get_invited_user_ids(self.room_id()).await;
let mut members = Vec::new();
for user in joined.into_iter().chain(invited) {
let x = move |u| async move {
self.store
.get_member_event(self.room_id(), &user)
.get_member_event(self.room_id(), &u)
.await
.map(|e| members.push(e.into()));
}
.map(|m| m.into())
};
members
joined.chain(invited).filter_map(x)
}
/// Calculate the canonical display name of the room, taking into account
@ -194,38 +196,43 @@ impl Room {
let members = self.get_j_members().await;
info!(
"Calculating name for {}, hero count {} members {:#?}",
self.room_id(),
heroes_count,
members
);
// info!(
// "Calculating name for {}, hero count {} members {:#?}",
// self.room_id(),
// heroes_count,
// members
// );
// TODO: This should use `self.heroes` but it is always empty??
//
let own_user_id = self.own_user_id.clone();
let is_own_member = |m: &RoomMember| &m.user_id() == &*own_user_id;
if heroes_count >= invited_joined {
let mut names = members
.iter()
.filter(|m| &m.user_id() != &*self.own_user_id)
.filter(|m| future::ready(is_own_member(m)))
.take(3)
.map(|mem| {
mem.display_name()
.clone()
.unwrap_or_else(|| mem.user_id().localpart().to_string())
})
.collect::<Vec<String>>();
.collect::<Vec<String>>()
.await;
// stabilize ordering
names.sort();
names.join(", ")
} else if heroes_count < invited_joined && invited + joined > 1 {
let mut names = members
.iter()
.filter(|m| &m.user_id() != &*self.own_user_id)
.filter(|m| future::ready(is_own_member(m)))
.take(3)
.map(|mem| {
mem.display_name()
.clone()
.unwrap_or_else(|| mem.user_id().localpart().to_string())
})
.collect::<Vec<String>>();
.collect::<Vec<String>>()
.await;
names.sort();
// TODO: What length does the spec want us to use here and in
@ -241,7 +248,7 @@ impl Room {
(*self.inner.lock().unwrap()).clone()
}
pub async fn joined_user_ids(&self) -> Vec<UserId> {
pub async fn joined_user_ids(&self) -> impl Stream<Item = UserId> {
self.store.get_joined_user_ids(&self.room_id).await
}
@ -251,7 +258,6 @@ impl Room {
pub fn update_summary(&self, summary: InnerSummary) {
let mut inner = self.inner.lock().unwrap();
info!("UPDAGING SUMMARY FOR {} WITH {:#?}", self.room_id, summary);
*inner = summary;
}
@ -263,6 +269,10 @@ impl Room {
&self.room_id
}
pub fn last_prev_batch(&self) -> Option<String> {
self.inner.lock().unwrap().last_prev_batch.clone()
}
pub async fn display_name(&self) -> String {
self.calculate_name().await
}
@ -359,6 +369,10 @@ impl InnerSummary {
}
}
pub fn is_encrypted(&self) -> bool {
self.encryption.is_some()
}
pub(crate) fn update(&mut self, summary: &RumaSummary) -> bool {
let mut changed = false;
@ -425,7 +439,6 @@ impl Store {
}
pub async fn save_changes(&self, changes: &StateChanges) {
info!("SAVING CHANGES OF SIZE {}", std::mem::size_of_val(changes));
let ret: TransactionResult<()> = (
&self.session,
&self.members,
@ -504,18 +517,24 @@ impl Store {
.map(|v| serde_json::from_slice(&v).unwrap())
}
pub async fn get_invited_user_ids(&self, room_id: &RoomId) -> Vec<UserId> {
self.invited_user_ids
.scan_prefix(room_id.as_bytes())
.map(|u| UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap())
.collect()
pub async fn get_invited_user_ids(&self, room_id: &RoomId) -> impl Stream<Item = UserId> {
stream::iter(
self.invited_user_ids
.scan_prefix(room_id.as_bytes())
.map(|u| {
UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap()
}),
)
}
pub async fn get_joined_user_ids(&self, room_id: &RoomId) -> Vec<UserId> {
self.joined_user_ids
.scan_prefix(room_id.as_bytes())
.map(|u| UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap())
.collect()
pub async fn get_joined_user_ids(&self, room_id: &RoomId) -> impl Stream<Item = UserId> {
stream::iter(
self.joined_user_ids
.scan_prefix(room_id.as_bytes())
.map(|u| {
UserId::try_from(String::from_utf8_lossy(&u.unwrap().1).to_string()).unwrap()
}),
)
}
pub fn get_session(&self) -> Option<Session> {