base: Restore getting the user ids when receiving a sync.

master
Damir Jelić 2020-12-14 13:48:29 +01:00
parent 05b1384d16
commit 3a76cf7692
5 changed files with 160 additions and 122 deletions

View File

@ -14,7 +14,7 @@
// limitations under the License.
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
fmt,
path::{Path, PathBuf},
@ -25,12 +25,13 @@ use std::{
use dashmap::DashMap;
#[cfg(feature = "encryption")]
use futures::StreamExt;
use matrix_sdk_common::{
api::r0 as api,
events::{
room::member::MemberEventContent, AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent,
AnySyncStateEvent, EventContent, StateEvent,
presence::PresenceEvent, room::member::{MemberEventContent, MembershipState}, AnyBasicEvent,
AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventContent, StateEvent,
},
identifiers::{RoomId, UserId},
locks::RwLock,
@ -39,13 +40,15 @@ use matrix_sdk_common::{
#[cfg(feature = "encryption")]
use matrix_sdk_common::{
api::r0::keys::claim_keys::Request as KeysClaimRequest,
events::{room::encrypted::EncryptedEventContent, AnyMessageEventContent, AnySyncMessageEvent},
events::{
room::encrypted::EncryptedEventContent,
AnyMessageEventContent, AnySyncMessageEvent,
},
identifiers::DeviceId,
locks::Mutex,
uuid::Uuid,
};
#[cfg(feature = "encryption")]
use matrix_sdk_common::{events::presence::PresenceEvent, locks::Mutex};
#[cfg(feature = "encryption")]
use matrix_sdk_crypto::{
store::{CryptoStore, CryptoStoreError},
Device, EncryptionSettings, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas,
@ -455,18 +458,30 @@ impl BaseClient {
ruma_timeline: api::sync::sync_events::Timeline,
summary: &mut RoomInfo,
changes: &mut StateChanges,
user_ids: &mut BTreeSet<UserId>,
) -> Timeline {
let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone());
let mut members = BTreeMap::new();
for event in ruma_timeline.events {
if let Ok(mut e) = hoist_room_event_prev_content(&event) {
match &mut e {
AnySyncRoomEvent::State(s) => match s {
AnySyncStateEvent::RoomMember(member) => {
if let Ok(member) = MemberEvent::try_from(member.clone()) {
members.insert(member.state_key.clone(), member);
match member.content.membership {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key.clone());
}
_ => {
user_ids.remove(&member.state_key);
}
}
changes
.members
.entry(room_id.clone())
.or_insert_with(BTreeMap::new)
.insert(member.state_key.clone(), member);
}
}
_ => {
@ -474,9 +489,9 @@ impl BaseClient {
changes.add_state_event(room_id, s.clone());
}
},
AnySyncRoomEvent::Message(message) =>
{
#[cfg(feature = "encryption")]
#[cfg(feature = "encryption")]
AnySyncRoomEvent::Message(message) => {
if let AnySyncMessageEvent::RoomEncrypted(encrypted) = message {
if let Some(olm) = self.olm_machine().await {
if let Ok(decrypted) =
@ -553,40 +568,54 @@ impl BaseClient {
State,
BTreeMap<UserId, MemberEvent>,
BTreeMap<String, AnySyncStateEvent>,
BTreeSet<UserId>,
) {
events.into_iter().fold(
(State::default(), BTreeMap::new(), BTreeMap::new()),
|(mut state, mut members, mut state_events), e| {
match hoist_and_deserialize_state_event(&e) {
Ok(e) => {
room_info.handle_state_event(&e);
state.events.push(e.clone());
let mut state = State::default();
let mut members = BTreeMap::new();
let mut state_events = BTreeMap::new();
let mut user_ids = BTreeSet::new();
if let AnySyncStateEvent::RoomMember(member) = e {
match MemberEvent::try_from(member) {
Ok(m) => {
members.insert(m.state_key.clone(), m);
}
Err(e) => warn!(
"Member event in room {} has an invalid state key {:?}",
room_info.room_id, e
),
}
} else {
state_events.insert(e.content().event_type().to_owned(), e);
}
}
let room_id = room_info.room_id.clone();
for event in
events
.into_iter()
.filter_map(|e| match hoist_and_deserialize_state_event(&e) {
Ok(e) => Some(e),
Err(err) => {
warn!(
"Couldn't deserialize state event for room {}: {:?} {:#?}",
room_info.room_id, err, e
room_id, err, e
);
None
}
}
})
{
state.events.push(event.clone());
room_info.handle_state_event(&event);
(state, members, state_events)
},
)
if let AnySyncStateEvent::RoomMember(member) = event {
match MemberEvent::try_from(member) {
Ok(m) => {
match m.content.membership {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(m.state_key.clone());
}
_ => (),
}
members.insert(m.state_key.clone(), m);
}
Err(e) => warn!(
"Member event in room {} has an invalid state key {:?}",
room_info.room_id, e
),
}
} else {
state_events.insert(event.content().event_type().to_owned(), event);
}
}
(state, members, state_events, user_ids)
}
async fn handle_room_account_data(
@ -647,7 +676,7 @@ impl BaseClient {
summary.update(&room_info.summary);
summary.set_prev_batch(room_info.timeline.prev_batch.as_deref());
let (state, members, state_events) =
let (state, members, state_events, mut user_ids) =
self.handle_state(room_info.state.events, &mut summary);
changes.members.insert(room_id.clone(), members);
@ -658,27 +687,37 @@ impl BaseClient {
}
let timeline = self
.handle_timeline(&room_id, room_info.timeline, &mut summary, &mut changes)
.handle_timeline(
&room_id,
room_info.timeline,
&mut summary,
&mut changes,
&mut user_ids,
)
.await;
let account_data = self
.handle_room_account_data(&room_id, &room_info.account_data.events, &mut changes)
.await;
// #[cfg(feature = "encryption")]
// if summary.is_encrypted() {
// // TODO if the room isn't encrypted but the new summary is,
// // add all the room users.
// if let Some(o) = self.olm_machine().await {
// if let Some(users) = changes.joined_user_ids.get(&room_id) {
// o.update_tracked_users(users).await
// }
#[cfg(feature = "encryption")]
if summary.is_encrypted() {
if let Some(o) = self.olm_machine().await {
if !room.is_encrypted() {
// The room turned on encryption in this sync, we need
// to get also all the existing users and mark them for
// tracking.
let joined = self.store.get_joined_user_ids(&room_id).await;
let invited = self.store.get_invited_user_ids(&room_id).await;
// if let Some(users) = changes.invited_user_ids.get(&room_id) {
// o.update_tracked_users(users).await
// }
// }
// }
// TODO don't use collect here.
let user_ids: Vec<UserId> = joined.chain(invited).collect().await;
o.update_tracked_users(&user_ids).await
}
o.update_tracked_users(&user_ids).await
}
}
let notification_count = room_info.unread_notifications.into();
summary.update_notification_count(notification_count);
@ -706,14 +745,20 @@ impl BaseClient {
let mut summary = room.clone_summary();
summary.mark_as_left();
let (state, members, state_events) =
let (state, members, state_events, mut user_ids) =
self.handle_state(room_info.state.events, &mut summary);
changes.members.insert(room_id.clone(), members);
changes.state.insert(room_id.clone(), state_events);
let timeline = self
.handle_timeline(&room_id, room_info.timeline, &mut summary, &mut changes)
.handle_timeline(
&room_id,
room_info.timeline,
&mut summary,
&mut changes,
&mut user_ids,
)
.await;
let account_data = self
@ -817,49 +862,44 @@ impl BaseClient {
let mut summary = room.clone_summary();
summary.mark_members_synced();
let mut changes = StateChanges::default();
let mut members = BTreeMap::new();
let members: BTreeMap<UserId, MemberEvent> =
futures::stream::iter(response.chunk.iter())
.map(|e| {
hoist_member_event(e)
.ok()
.map(|e| MemberEvent::try_from(e).ok())
.flatten()
})
.filter_map(|e| async {
if let Some(e) = e {
if self
.store
.get_member_event(room_id, &e.state_key)
.await
.is_some()
{
Some((e.state_key.clone(), e))
} else {
None
}
} else {
None
#[cfg(feature = "encryption")]
let mut user_ids = BTreeSet::new();
for member in response.chunk.iter().filter_map(|e| {
hoist_member_event(e)
.ok()
.map(|e| MemberEvent::try_from(e).ok())
.flatten()
}) {
if self
.store
.get_member_event(&room_id, &member.state_key)
.await
.is_some()
{
#[cfg(feature = "encryption")]
match member.content.membership {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key.clone());
}
})
.collect()
.await;
_ => (),
}
members.insert(member.state_key.clone(), member);
}
}
let mut changes = StateChanges::default();
changes.members.insert(room_id.clone(), members);
// #[cfg(feature = "encryption")]
// if summary.is_encrypted() {
// if let Some(o) = self.olm_machine().await {
// if let Some(users) = changes.joined_user_ids.get(room_id) {
// o.update_tracked_users(users).await
// }
// if let Some(users) = changes.invited_user_ids.get(room_id) {
// o.update_tracked_users(users).await
// }
// }
// }
#[cfg(feature = "encryption")]
if summary.is_encrypted() {
if let Some(o) = self.olm_machine().await {
o.update_tracked_users(&user_ids).await
}
}
changes.add_room(summary);
@ -987,6 +1027,10 @@ impl BaseClient {
self.rooms.get(room_id).map(|r| r.clone())
}
pub fn get_stripped_room(&self, room_id: &RoomId) -> Option<StrippedRoom> {
self.stripped_rooms.get(room_id).map(|r| r.clone())
}
/// Encrypt a message event content.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]

View File

@ -16,24 +16,23 @@ use std::sync::Arc;
use matrix_sdk_common::{
events::{
presence::PresenceEvent,
room::{member::MemberEventContent, power_levels::PowerLevelsEventContent},
SyncStateEvent,
presence::PresenceEvent, room::power_levels::PowerLevelsEventContent, SyncStateEvent,
},
identifiers::UserId,
};
use crate::responses::MemberEvent;
#[derive(Clone, Debug)]
pub struct RoomMember {
pub(crate) user_id: Arc<UserId>,
pub(crate) event: Arc<SyncStateEvent<MemberEventContent>>,
pub(crate) event: Arc<MemberEvent>,
pub(crate) presence: Arc<Option<PresenceEvent>>,
pub(crate) power_levles: Arc<Option<SyncStateEvent<PowerLevelsEventContent>>>,
}
impl RoomMember {
pub fn user_id(&self) -> &UserId {
&self.user_id
&self.event.state_key
}
pub fn display_name(&self) -> Option<&str> {

View File

@ -109,7 +109,6 @@ impl Room {
.get_member_event(self.room_id(), &u)
.await
.map(|m| RoomMember {
user_id: UserId::try_from(m.state_key.as_str()).unwrap().into(),
event: m.into(),
presence: presence.into(),
power_levles: power.into(),
@ -247,7 +246,6 @@ impl Room {
.get_member_event(&self.room_id, user_id)
.await
.map(|e| RoomMember {
user_id: UserId::try_from(e.state_key.as_str()).unwrap().into(),
event: e.into(),
presence: presence.into(),
power_levles: power.into(),

View File

@ -172,7 +172,7 @@ pub struct StrippedRoomInfo {
}
impl StrippedRoomInfo {
pub fn handle_state_event(&mut self, event: &AnyStrippedStateEvent) -> bool {
pub(crate) fn handle_state_event(&mut self, event: &AnyStrippedStateEvent) -> bool {
match event {
AnyStrippedStateEvent::RoomEncryption(encryption) => {
self.encryption = Some(encryption.content.clone());
@ -193,12 +193,4 @@ impl StrippedRoomInfo {
_ => false,
}
}
pub fn is_encrypted(&self) -> bool {
self.encryption.is_some()
}
pub fn serialize(&self) -> Vec<u8> {
serde_json::to_vec(&self).unwrap()
}
}

View File

@ -3,10 +3,8 @@ use std::{collections::BTreeMap, convert::TryFrom, path::Path, time::SystemTime}
use futures::stream::{self, Stream};
use matrix_sdk_common::{
events::{
presence::PresenceEvent,
room::member::{MemberEventContent, MembershipState},
AnyBasicEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType,
SyncStateEvent,
presence::PresenceEvent, room::member::MembershipState, AnyBasicEvent,
AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType,
},
identifiers::{RoomId, UserId},
};
@ -40,12 +38,14 @@ pub struct Store {
#[derive(Debug, Default)]
pub struct StateChanges {
pub session: Option<Session>,
pub account_data: BTreeMap<String, AnyBasicEvent>,
pub presence: BTreeMap<UserId, PresenceEvent>,
pub members: BTreeMap<RoomId, BTreeMap<UserId, MemberEvent>>,
pub state: BTreeMap<RoomId, BTreeMap<String, AnySyncStateEvent>>,
pub account_data: BTreeMap<String, AnyBasicEvent>,
pub room_account_data: BTreeMap<RoomId, BTreeMap<String, AnyBasicEvent>>,
pub room_infos: BTreeMap<RoomId, RoomInfo>,
pub presence: BTreeMap<UserId, PresenceEvent>,
pub stripped_state: BTreeMap<RoomId, BTreeMap<String, AnyStrippedStateEvent>>,
pub stripped_members: BTreeMap<RoomId, BTreeMap<UserId, StrippedMemberEvent>>,
pub invited_room_info: BTreeMap<RoomId, RoomInfo>,
@ -329,7 +329,7 @@ impl Store {
&self,
room_id: &RoomId,
state_key: &UserId,
) -> Option<SyncStateEvent<MemberEventContent>> {
) -> Option<MemberEvent> {
self.members
.get(format!("{}{}", room_id.as_str(), state_key.as_str()))
.unwrap()
@ -386,7 +386,7 @@ mod test {
use matrix_sdk_test::async_test;
use super::{StateChanges, Store};
use crate::Session;
use crate::{responses::MemberEvent, Session};
fn user_id() -> UserId {
user_id!("@example:localhost")
@ -396,7 +396,7 @@ mod test {
"DEVICEID".into()
}
fn membership_event() -> SyncStateEvent<MemberEventContent> {
fn membership_event() -> MemberEvent {
let content = MemberEventContent {
avatar_url: None,
displayname: None,
@ -405,12 +405,12 @@ mod test {
membership: MembershipState::Join,
};
SyncStateEvent {
MemberEvent {
event_id: EventId::try_from("$h29iv0s8:example.com").unwrap(),
content,
sender: user_id(),
origin_server_ts: SystemTime::now(),
state_key: user_id().to_string(),
state_key: user_id(),
prev_content: None,
unsigned: Unsigned::default(),
}
@ -439,7 +439,12 @@ mod test {
let user_id = user_id();
assert!(store.get_member_event(&room_id, &user_id).await.is_none());
let changes = StateChanges::from_event(&room_id!("!test:localhost"), membership_event());
let mut changes = StateChanges::default();
changes
.members
.entry(room_id.clone())
.or_default()
.insert(user_id.clone(), membership_event());
store.save_changes(&changes).await;
assert!(store.get_member_event(&room_id, &user_id).await.is_some());