base: Allow users to inspect the ambiguity change a member event triggers

master
Damir Jelić 2021-01-27 11:42:13 +01:00
parent 55430dd3d2
commit 094ead9d7d
6 changed files with 416 additions and 103 deletions

View File

@ -40,8 +40,8 @@ use tracing::{debug, warn};
use tracing::{error, info, instrument};
use matrix_sdk_base::{
deserialized_responses::SyncResponse, BaseClient, BaseClientConfig, EventEmitter, InvitedRoom,
JoinedRoom, LeftRoom, Session, Store,
deserialized_responses::{MembersResponse, SyncResponse},
BaseClient, BaseClientConfig, EventEmitter, InvitedRoom, JoinedRoom, LeftRoom, Session, Store,
};
#[cfg(feature = "encryption")]
@ -72,8 +72,7 @@ use matrix_sdk_common::{
filter::{create_filter::Request as FilterUploadRequest, FilterDefinition},
media::create_content,
membership::{
ban_user, forget_room,
get_member_events::{self, Response as MembersResponse},
ban_user, forget_room, get_member_events,
invite_user::{self, InvitationRecipient},
join_room_by_id, join_room_by_id_or_alias, kick_user, leave_room, Invite3pid,
},
@ -1614,9 +1613,7 @@ impl Client {
let request = get_member_events::Request::new(room_id);
let response = self.send(request).await?;
self.base_client.receive_members(room_id, &response).await?;
Ok(response)
Ok(self.base_client.receive_members(room_id, &response).await?)
}
/// Synchronize the client's state with the latest state on the server.

View File

@ -25,8 +25,9 @@ use std::{
use matrix_sdk_common::{
api::r0 as api,
deserialized_responses::{
AccountData, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, MemberEvent,
Presence, Rooms, State, StrippedMemberEvent, SyncResponse, Timeline,
AccountData, AmbiguityChanges, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
MemberEvent, MembersResponse, Presence, Rooms, State, StrippedMemberEvent, SyncResponse,
Timeline,
},
events::{
presence::PresenceEvent,
@ -61,7 +62,7 @@ use crate::{
event_emitter::Emitter,
rooms::{RoomInfo, RoomType, StrippedRoomInfo},
session::Session,
store::{Result as StoreResult, StateChanges, Store},
store::{ambiguity_map::AmbiguityCache, Result as StoreResult, StateChanges, Store},
EventEmitter, RoomState,
};
@ -443,8 +444,9 @@ impl BaseClient {
ruma_timeline: api::sync::sync_events::Timeline,
room_info: &mut RoomInfo,
changes: &mut StateChanges,
ambiguity_cache: &mut AmbiguityCache,
user_ids: &mut BTreeSet<UserId>,
) -> Timeline {
) -> StoreResult<Timeline> {
let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone());
for event in ruma_timeline.events {
@ -454,6 +456,10 @@ impl BaseClient {
AnySyncRoomEvent::State(s) => match s {
AnySyncStateEvent::RoomMember(member) => {
if let Ok(member) = MemberEvent::try_from(member.clone()) {
ambiguity_cache
.handle_event(changes, room_id, &member)
.await?;
match member.content.membership {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key.clone());
@ -519,7 +525,7 @@ impl BaseClient {
}
}
timeline
Ok(timeline)
}
#[allow(clippy::type_complexity)]
@ -569,18 +575,13 @@ impl BaseClient {
)
}
#[allow(clippy::type_complexity)]
fn handle_state(
async fn handle_state(
&self,
changes: &mut StateChanges,
ambiguity_cache: &mut AmbiguityCache,
events: Vec<Raw<AnySyncStateEvent>>,
room_info: &mut RoomInfo,
) -> (
State,
BTreeMap<UserId, MemberEvent>,
BTreeMap<UserId, MemberEventContent>,
BTreeMap<String, BTreeMap<String, AnySyncStateEvent>>,
BTreeSet<UserId>,
) {
) -> StoreResult<(State, BTreeSet<UserId>)> {
let mut state = State::default();
let mut members = BTreeMap::new();
let mut state_events = BTreeMap::new();
@ -609,6 +610,8 @@ impl BaseClient {
if let AnySyncStateEvent::RoomMember(member) = event {
match MemberEvent::try_from(member) {
Ok(m) => {
ambiguity_cache.handle_event(changes, &room_id, &m).await?;
match m.content.membership {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(m.state_key.clone());
@ -639,7 +642,11 @@ impl BaseClient {
}
}
(state, members, profiles, state_events, user_ids)
changes.members.insert(room_id.as_ref().clone(), members);
changes.profiles.insert(room_id.as_ref().clone(), profiles);
changes.state.insert(room_id.as_ref().clone(), state_events);
Ok((state, user_ids))
}
async fn handle_room_account_data(
@ -738,6 +745,8 @@ impl BaseClient {
.into();
let mut changes = StateChanges::new(response.next_batch.clone());
let mut ambiguity_cache = AmbiguityCache::new(self.store.clone());
let mut rooms = Rooms::default();
for (room_id, new_info) in response.rooms.join {
@ -751,12 +760,14 @@ impl BaseClient {
room_info.update_summary(&new_info.summary);
room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
let (state, members, profiles, state_events, mut user_ids) =
self.handle_state(new_info.state.events, &mut room_info);
changes.members.insert(room_id.clone(), members);
changes.profiles.insert(room_id.clone(), profiles);
changes.state.insert(room_id.clone(), state_events);
let (state, mut user_ids) = self
.handle_state(
&mut changes,
&mut ambiguity_cache,
new_info.state.events,
&mut room_info,
)
.await?;
if new_info.timeline.limited {
room_info.mark_members_missing();
@ -768,9 +779,10 @@ impl BaseClient {
new_info.timeline,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
&mut user_ids,
)
.await;
.await?;
let account_data = self
.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes)
@ -797,7 +809,6 @@ impl BaseClient {
let notification_count = new_info.unread_notifications.into();
room_info.update_notification_count(notification_count);
// TODO should we store this?
let ephemeral = Ephemeral {
events: new_info
.ephemeral
@ -823,12 +834,14 @@ impl BaseClient {
let mut room_info = room.clone_info();
room_info.mark_as_left();
let (state, members, profiles, state_events, mut user_ids) =
self.handle_state(new_info.state.events, &mut room_info);
changes.members.insert(room_id.clone(), members);
changes.profiles.insert(room_id.clone(), profiles);
changes.state.insert(room_id.clone(), state_events);
let (state, mut user_ids) = self
.handle_state(
&mut changes,
&mut ambiguity_cache,
new_info.state.events,
&mut room_info,
)
.await?;
let timeline = self
.handle_timeline(
@ -836,9 +849,10 @@ impl BaseClient {
new_info.timeline,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
&mut user_ids,
)
.await;
.await?;
let account_data = self
.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes)
@ -893,6 +907,8 @@ impl BaseClient {
self.handle_account_data(response.account_data.events, &mut changes)
.await;
changes.ambiguity_maps = ambiguity_cache.cache;
self.store.save_changes(&changes).await?;
*self.sync_token.write().await = Some(response.next_batch.clone());
self.apply_changes(&changes).await;
@ -915,6 +931,9 @@ impl BaseClient {
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
ambiguity_changes: AmbiguityChanges {
changes: ambiguity_cache.changes,
},
};
if let Some(emitter) = self.event_emitter.read().await.as_ref() {
@ -936,21 +955,28 @@ impl BaseClient {
&self,
room_id: &RoomId,
response: &api::membership::get_member_events::Response,
) -> Result<()> {
) -> Result<MembersResponse> {
let members: Vec<MemberEvent> = response
.chunk
.iter()
.filter_map(|e| {
hoist_member_event(e)
.ok()
.and_then(|e| MemberEvent::try_from(e).ok())
})
.collect();
let mut ambiguity_cache = AmbiguityCache::new(self.store.clone());
if let Some(room) = self.store.get_bare_room(room_id) {
let mut room_info = room.clone_info();
room_info.mark_members_synced();
let mut members = BTreeMap::new();
let mut changes = StateChanges::default();
#[cfg(feature = "encryption")]
let mut user_ids = BTreeSet::new();
for member in response.chunk.iter().filter_map(|e| {
hoist_member_event(e)
.ok()
.and_then(|e| MemberEvent::try_from(e).ok())
}) {
for member in &members {
if self
.store
.get_member_event(&room_id, &member.state_key)
@ -965,12 +991,25 @@ impl BaseClient {
_ => (),
}
members.insert(member.state_key.clone(), member);
}
ambiguity_cache
.handle_event(&changes, room_id, &member)
.await?;
if member.state_key == member.sender {
changes
.profiles
.entry(room_id.clone())
.or_insert_with(BTreeMap::new)
.insert(member.sender.clone(), member.content.clone());
}
let mut changes = StateChanges::default();
changes.members.insert(room_id.clone(), members);
changes
.members
.entry(room_id.clone())
.or_insert_with(BTreeMap::new)
.insert(member.state_key.clone(), member.clone());
}
}
#[cfg(feature = "encryption")]
if room_info.is_encrypted() {
@ -979,13 +1018,19 @@ impl BaseClient {
}
}
changes.ambiguity_maps = ambiguity_cache.cache;
changes.add_room(room_info);
self.store.save_changes(&changes).await?;
self.apply_changes(&changes).await;
}
Ok(())
Ok(MembersResponse {
chunk: members,
ambiguity_changes: AmbiguityChanges {
changes: ambiguity_cache.changes,
},
})
}
pub async fn receive_filter_upload(

View File

@ -0,0 +1,286 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, BTreeSet};
use matrix_sdk_common::{
deserialized_responses::{AmbiguityChange, MemberEvent},
events::room::member::MembershipState,
identifiers::{EventId, RoomId, UserId},
};
use tracing::trace;
use crate::Store;
use super::{Result, StateChanges};
#[derive(Clone, Debug)]
pub struct AmbiguityCache {
pub store: Store,
pub cache: BTreeMap<RoomId, BTreeMap<String, BTreeSet<UserId>>>,
pub changes: BTreeMap<RoomId, BTreeMap<EventId, AmbiguityChange>>,
}
#[derive(Clone, Debug)]
struct AmbiguityMap {
display_name: String,
users: BTreeSet<UserId>,
}
impl AmbiguityMap {
fn remove(&mut self, user_id: &UserId) -> Option<UserId> {
self.users.remove(user_id);
if self.user_count() == 1 {
self.users.iter().next().cloned()
} else {
None
}
}
fn add(&mut self, user_id: UserId) -> Option<UserId> {
let ambiguous_user = if self.user_count() == 1 {
self.users.iter().next().cloned()
} else {
None
};
self.users.insert(user_id);
ambiguous_user
}
fn user_count(&self) -> usize {
self.users.len()
}
fn is_ambiguous(&self) -> bool {
self.user_count() > 1
}
}
impl AmbiguityCache {
pub fn new(store: Store) -> Self {
Self {
store,
cache: BTreeMap::new(),
changes: BTreeMap::new(),
}
}
pub async fn handle_event(
&mut self,
changes: &StateChanges,
room_id: &RoomId,
member_event: &MemberEvent,
) -> Result<()> {
// Synapse seems to have a bug where it puts the same event into the
// state and the timeline sometimes.
//
// Since our state, e.g. the old display name, already ended up inside
// the state changes and we're pulling stuff out of the cache if it's
// there calculating this twice for the same event will result in an
// incorrect AmbiguityChange overwriting the correct one. In other
// words, this method is not idempotent so we make it by ignoring
// duplicate events.
if self
.changes
.get(room_id)
.map(|c| c.contains_key(&member_event.event_id))
.unwrap_or(false)
{
return Ok(());
}
let (mut old_map, mut new_map) = self.get(changes, room_id, member_event).await?;
let display_names_same = match (&old_map, &new_map) {
(Some(a), Some(b)) => a.display_name == b.display_name,
_ => false,
};
if display_names_same {
return Ok(());
}
let disambiguated_member = old_map
.as_mut()
.and_then(|o| o.remove(&member_event.state_key));
let ambiguated_member = new_map
.as_mut()
.and_then(|n| n.add(member_event.state_key.clone()));
let ambiguous = new_map.as_ref().map(|n| n.is_ambiguous()).unwrap_or(false);
self.update(room_id, old_map, new_map);
let change = AmbiguityChange {
disambiguated_member,
ambiguated_member,
member_ambiguous: ambiguous,
};
trace!(
"Handling display name ambiguity for {}: {:#?}",
member_event.state_key,
change
);
self.add_change(room_id, member_event.event_id.clone(), change);
Ok(())
}
fn update(
&mut self,
room_id: &RoomId,
old_map: Option<AmbiguityMap>,
new_map: Option<AmbiguityMap>,
) {
let entry = self
.cache
.entry(room_id.clone())
.or_insert_with(BTreeMap::new);
if let Some(old) = old_map {
entry.insert(old.display_name, old.users);
}
if let Some(new) = new_map {
entry.insert(new.display_name, new.users);
}
}
fn add_change(&mut self, room_id: &RoomId, event_id: EventId, change: AmbiguityChange) {
self.changes
.entry(room_id.clone())
.or_insert_with(BTreeMap::new)
.insert(event_id, change);
}
async fn get(
&mut self,
changes: &StateChanges,
room_id: &RoomId,
member_event: &MemberEvent,
) -> Result<(Option<AmbiguityMap>, Option<AmbiguityMap>)> {
use MembershipState::*;
let old_event = if let Some(m) = changes
.members
.get(room_id)
.and_then(|m| m.get(&member_event.state_key))
{
Some(m.clone())
} else if let Some(m) = self
.store
.get_member_event(room_id, &member_event.state_key)
.await?
{
Some(m)
} else {
None
};
let old_display_name = if let Some(event) = old_event {
if matches!(event.content.membership, Join | Invite) {
let dispaly_name = if let Some(d) = changes
.profiles
.get(room_id)
.and_then(|p| p.get(&member_event.state_key))
.and_then(|p| p.displayname.as_deref())
{
Some(d.to_string())
} else if let Some(d) = self
.store
.get_profile(room_id, &member_event.state_key)
.await?
.and_then(|c| c.displayname)
{
Some(d)
} else {
event.content.displayname.clone()
};
Some(dispaly_name.unwrap_or_else(|| event.state_key.localpart().to_string()))
} else {
None
}
} else {
None
};
let old_map = if let Some(old_name) = old_display_name.as_deref() {
let old_display_name_map = if let Some(u) = self
.cache
.entry(room_id.clone())
.or_insert_with(BTreeMap::new)
.get(old_name)
{
u.clone()
} else {
self.store
.get_users_with_display_name(&room_id, &old_name)
.await?
};
Some(AmbiguityMap {
display_name: old_name.to_string(),
users: old_display_name_map,
})
} else {
None
};
let new_map = if matches!(member_event.content.membership, Join | Invite) {
let new = member_event
.content
.displayname
.as_deref()
.unwrap_or_else(|| member_event.state_key.localpart());
// We don't allow other users to set the display name, so if we have
// a more trusted version of the display name use that.
let new_display_name = if member_event.sender.as_str() == member_event.state_key {
new
} else if let Some(old) = old_display_name.as_deref() {
old
} else {
new
};
let new_display_name_map = if let Some(u) = self
.cache
.entry(room_id.clone())
.or_insert_with(BTreeMap::new)
.get(new_display_name)
{
u.clone()
} else {
self.store
.get_users_with_display_name(&room_id, &new_display_name)
.await?
};
Some(AmbiguityMap {
display_name: new_display_name.to_string(),
users: new_display_name_map,
})
} else {
None
};
Ok((old_map, new_map))
}
}

View File

@ -39,6 +39,7 @@ use crate::{
InvitedRoom, JoinedRoom, LeftRoom, Room, RoomState, Session,
};
pub(crate) mod ambiguity_map;
mod memory_store;
#[cfg(feature = "sled_state_store")]
mod sled_store;
@ -264,6 +265,7 @@ pub struct StateChanges {
pub members: BTreeMap<RoomId, BTreeMap<UserId, MemberEvent>>,
pub profiles: BTreeMap<RoomId, BTreeMap<UserId, MemberEventContent>>,
pub ambiguity_maps: BTreeMap<RoomId, BTreeMap<String, BTreeSet<UserId>>>,
pub state: BTreeMap<RoomId, BTreeMap<String, BTreeMap<String, AnySyncStateEvent>>>,
pub room_account_data: BTreeMap<RoomId, BTreeMap<String, AnyBasicEvent>>,
pub room_infos: BTreeMap<RoomId, RoomInfo>,

View File

@ -320,63 +320,16 @@ impl SledStore {
for event in events.values() {
let key = (room.as_str(), event.state_key.as_str()).encode();
let old_profile: Option<MemberEventContent> = if let Some(p) = profiles
.get(key.as_slice())?
.map(|p| self.deserialize_event(&p))
.transpose()
.map_err(ConflictableTransactionError::Abort)?
{
p
} else {
members
.get(key.as_slice())?
.map(|m| self.deserialize_event::<MemberEvent>(&m))
.transpose()
.map_err(ConflictableTransactionError::Abort)?
.map(|m| m.content)
};
let old_display_name = old_profile
.map(|m| {
m.displayname
.unwrap_or_else(|| event.state_key.localpart().to_string())
})
.unwrap_or_else(|| event.state_key.localpart().to_string());
let old_display_name_key = (
room.as_str(),
old_display_name.as_str(),
event.state_key.as_str(),
)
.encode();
let display_name = profile_changes
.and_then(|p| p.get(&event.state_key))
.as_ref()
.map(|m| m.displayname.as_deref())
.unwrap_or_else(|| Some(event.state_key.localpart()))
.unwrap_or_else(|| event.state_key.localpart());
let display_name_key =
(room.as_str(), display_name, event.state_key.as_str()).encode();
match event.content.membership {
MembershipState::Join => {
joined.insert(key.as_slice(), event.state_key.as_str())?;
invited.remove(key.as_slice())?;
display_names.remove(old_display_name_key)?;
display_names
.insert(display_name_key, event.state_key.as_str())?;
}
MembershipState::Invite => {
invited.insert(key.as_slice(), event.state_key.as_str())?;
joined.remove(key.as_slice())?;
display_names.remove(old_display_name_key)?;
display_names
.insert(display_name_key, event.state_key.as_str())?;
}
_ => {
display_names.remove(old_display_name_key)?;
joined.remove(key.as_slice())?;
invited.remove(key.as_slice())?;
}
@ -400,6 +353,16 @@ impl SledStore {
}
}
for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
for (display_name, map) in ambiguity_maps {
display_names.insert(
(room_id.as_str(), display_name.as_str()).encode(),
self.serialize_event(&map)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
}
for (event_type, event) in &changes.account_data {
account_data.insert(
event_type.as_str().encode(),
@ -593,13 +556,12 @@ impl SledStore {
) -> Result<BTreeSet<UserId>> {
let key = (room_id.as_str(), display_name).encode();
self.display_names
.scan_prefix(key)
.map(|u| {
UserId::try_from(String::from_utf8_lossy(&u?.1).to_string())
.map_err(StoreError::Identifier)
})
.collect()
Ok(self
.display_names
.get(key)?
.map(|m| self.deserialize_event(&m))
.transpose()?
.unwrap_or_default())
}
}

View File

@ -13,6 +13,18 @@ use super::{
identifiers::{DeviceKeyAlgorithm, EventId, RoomId, UserId},
};
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct AmbiguityChange {
pub member_ambiguous: bool,
pub disambiguated_member: Option<UserId>,
pub ambiguated_member: Option<UserId>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct AmbiguityChanges {
pub changes: BTreeMap<RoomId, BTreeMap<EventId, AmbiguityChange>>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct SyncResponse {
/// The batch token to supply in the `since` param of the next `/sync` request.
@ -32,6 +44,8 @@ pub struct SyncResponse {
/// For each key algorithm, the number of unclaimed one-time keys
/// currently held on the server for a device.
pub device_one_time_keys_count: BTreeMap<DeviceKeyAlgorithm, u64>,
/// Collection of ambiguioty changes that room member events trigger.
pub ambiguity_changes: AmbiguityChanges,
}
impl SyncResponse {
@ -305,3 +319,10 @@ impl Into<StrippedStateEvent<MemberEventContent>> for StrippedMemberEvent {
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct MembersResponse {
pub chunk: Vec<MemberEvent>,
/// Collection of ambiguioty changes that room member events trigger.
pub ambiguity_changes: AmbiguityChanges,
}