base: Upcast member events so the state key is an user id.

master
Damir Jelić 2020-12-12 21:44:53 +01:00
parent e245599913
commit 05b1384d16
5 changed files with 331 additions and 236 deletions

View File

@ -14,25 +14,25 @@
// limitations under the License. // limitations under the License.
use std::{ use std::{
collections::BTreeMap,
convert::TryFrom, convert::TryFrom,
fmt, fmt,
path::{Path, PathBuf}, path::{Path, PathBuf},
result::Result as StdResult, result::Result as StdResult,
sync::Arc, sync::Arc,
time::SystemTime,
}; };
use dashmap::DashMap; use dashmap::DashMap;
use futures::StreamExt; use futures::StreamExt;
#[cfg(feature = "encryption")]
use matrix_sdk_common::locks::Mutex;
use matrix_sdk_common::{ use matrix_sdk_common::{
api::r0 as api, api::r0 as api,
events::{ events::{
room::member::MemberEventContent, AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent, room::member::MemberEventContent, AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent,
AnySyncStateEvent, StateEvent, SyncStateEvent, AnySyncStateEvent, EventContent, StateEvent,
}, },
identifiers::{room_id, RoomId, UserId}, identifiers::{RoomId, UserId},
locks::RwLock, locks::RwLock,
Raw, Raw,
}; };
@ -44,21 +44,23 @@ use matrix_sdk_common::{
uuid::Uuid, uuid::Uuid,
}; };
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
use matrix_sdk_common::{events::presence::PresenceEvent, locks::Mutex};
#[cfg(feature = "encryption")]
use matrix_sdk_crypto::{ use matrix_sdk_crypto::{
store::{CryptoStore, CryptoStoreError}, store::{CryptoStore, CryptoStoreError},
Device, EncryptionSettings, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas, Device, EncryptionSettings, IncomingResponse, OlmError, OlmMachine, OutgoingRequest, Sas,
ToDeviceRequest, UserDevices, ToDeviceRequest, UserDevices,
}; };
use tracing::{error, info, warn}; use tracing::{info, warn};
use zeroize::Zeroizing; use zeroize::Zeroizing;
use crate::{ use crate::{
error::Result, error::Result,
responses::{ responses::{
AccountData, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, Presence, Rooms, AccountData, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, MemberEvent,
State, SyncResponse, Timeline, Presence, Rooms, State, StrippedMemberEvent, SyncResponse, Timeline,
}, },
rooms::{Room, RoomInfo, RoomType, StrippedRoom}, rooms::{Room, RoomInfo, RoomType, StrippedRoom, StrippedRoomInfo},
session::Session, session::Session,
store::{StateChanges, Store}, store::{StateChanges, Store},
}; };
@ -148,27 +150,6 @@ fn hoist_room_event_prev_content(
Ok(ev) Ok(ev)
} }
fn handle_membership(
changes: &mut StateChanges,
room_id: &RoomId,
event: &SyncStateEvent<MemberEventContent>,
) {
use matrix_sdk_common::events::room::member::MembershipState::*;
match &event.content.membership {
Join => {
info!("ADDING MEMBER {} to {}", event.state_key, room_id);
changes.add_joined_member(room_id, event.clone())
// TODO check if the display name is
// ambigous
}
Invite => {
info!("ADDING INVITED MEMBER {} to {}", event.state_key, room_id);
changes.add_invited_member(room_id, event.clone())
}
membership => info!("UNHANDLED MEMBERSHIP {} {:?}", event.state_key, membership),
}
}
/// Signals to the `BaseClient` which `RoomState` to send to `EventEmitter`. /// Signals to the `BaseClient` which `RoomState` to send to `EventEmitter`.
#[derive(Debug)] #[derive(Debug)]
pub enum RoomStateType { pub enum RoomStateType {
@ -471,18 +452,22 @@ impl BaseClient {
async fn handle_timeline( async fn handle_timeline(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
ruma_timeline: &api::sync::sync_events::Timeline, ruma_timeline: api::sync::sync_events::Timeline,
summary: &mut RoomInfo, summary: &mut RoomInfo,
mut changes: &mut StateChanges, changes: &mut StateChanges,
) -> Timeline { ) -> Timeline {
let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone()); let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone());
for event in &ruma_timeline.events { let mut members = BTreeMap::new();
if let Ok(mut e) = hoist_room_event_prev_content(event) {
for event in ruma_timeline.events {
if let Ok(mut e) = hoist_room_event_prev_content(&event) {
match &mut e { match &mut e {
AnySyncRoomEvent::State(s) => match s { AnySyncRoomEvent::State(s) => match s {
AnySyncStateEvent::RoomMember(member) => { AnySyncStateEvent::RoomMember(member) => {
handle_membership(&mut changes, room_id, member); if let Ok(member) = MemberEvent::try_from(member.clone()) {
members.insert(member.state_key.clone(), member);
}
} }
_ => { _ => {
summary.handle_state_event(&s); summary.handle_state_event(&s);
@ -517,39 +502,91 @@ impl BaseClient {
timeline timeline
} }
async fn handle_state( fn handle_invited_state(
&self, &self,
room_id: &RoomId, events: Vec<Raw<AnyStrippedStateEvent>>,
events: &[Raw<AnySyncStateEvent>], room_info: &mut StrippedRoomInfo,
summary: &mut RoomInfo, ) -> (
mut changes: &mut StateChanges, InviteState,
) -> State { BTreeMap<UserId, StrippedMemberEvent>,
let mut state = State::default(); BTreeMap<String, AnyStrippedStateEvent>,
) {
events.into_iter().fold(
(InviteState::default(), BTreeMap::new(), BTreeMap::new()),
|(mut state, mut members, mut state_events), e| {
match e.deserialize() {
Ok(e) => {
state.events.push(e.clone());
for e in events { if let AnyStrippedStateEvent::RoomMember(member) = e {
match hoist_and_deserialize_state_event(e) { match StrippedMemberEvent::try_from(member) {
Ok(event) => { Ok(m) => {
match &event { members.insert(m.state_key.clone(), m);
AnySyncStateEvent::RoomMember(member) => { }
handle_membership(&mut changes, room_id, member); Err(e) => warn!(
} "Stripped member event in room {} has an invalid state key {:?}",
e => { room_info.room_id, e
summary.handle_state_event(&e); ),
}
changes.add_state_event(room_id, e.clone()); } else {
room_info.handle_state_event(&e);
state_events.insert(e.content().event_type().to_owned(), e);
} }
} }
Err(err) => {
state.events.push(event); warn!(
"Couldn't deserialize stripped state event for room {}: {:?}",
room_info.room_id, err
);
}
} }
Err(err) => warn!( (state, members, state_events)
"Couldn't deserialize state event for room {}: {:?} {:#?}", },
room_id, err, e )
), }
}
}
state fn handle_state(
&self,
events: Vec<Raw<AnySyncStateEvent>>,
room_info: &mut RoomInfo,
) -> (
State,
BTreeMap<UserId, MemberEvent>,
BTreeMap<String, AnySyncStateEvent>,
) {
events.into_iter().fold(
(State::default(), BTreeMap::new(), BTreeMap::new()),
|(mut state, mut members, mut state_events), e| {
match hoist_and_deserialize_state_event(&e) {
Ok(e) => {
room_info.handle_state_event(&e);
state.events.push(e.clone());
if let AnySyncStateEvent::RoomMember(member) = e {
match MemberEvent::try_from(member) {
Ok(m) => {
members.insert(m.state_key.clone(), m);
}
Err(e) => warn!(
"Member event in room {} has an invalid state key {:?}",
room_info.room_id, e
),
}
} else {
state_events.insert(e.content().event_type().to_owned(), e);
}
}
Err(err) => {
warn!(
"Couldn't deserialize state event for room {}: {:?} {:#?}",
room_info.room_id, err, e
);
}
}
(state, members, state_events)
},
)
} }
async fn handle_room_account_data( async fn handle_room_account_data(
@ -584,6 +621,8 @@ impl BaseClient {
return Ok(SyncResponse::new(response.next_batch)); return Ok(SyncResponse::new(response.next_batch));
} }
let now = SystemTime::now();
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
{ {
let olm = self.olm.lock().await; let olm = self.olm.lock().await;
@ -608,41 +647,38 @@ impl BaseClient {
summary.update(&room_info.summary); summary.update(&room_info.summary);
summary.set_prev_batch(room_info.timeline.prev_batch.as_deref()); summary.set_prev_batch(room_info.timeline.prev_batch.as_deref());
let state = self let (state, members, state_events) =
.handle_state( self.handle_state(room_info.state.events, &mut summary);
&room_id,
&room_info.state.events, changes.members.insert(room_id.clone(), members);
&mut summary, changes.state.insert(room_id.clone(), state_events);
&mut changes,
) if room_info.timeline.limited {
.await; summary.mark_members_missing();
}
let timeline = self let timeline = self
.handle_timeline(&room_id, &room_info.timeline, &mut summary, &mut changes) .handle_timeline(&room_id, room_info.timeline, &mut summary, &mut changes)
.await; .await;
let account_data = self let account_data = self
.handle_room_account_data(&room_id, &room_info.account_data.events, &mut changes) .handle_room_account_data(&room_id, &room_info.account_data.events, &mut changes)
.await; .await;
#[cfg(feature = "encryption")] // #[cfg(feature = "encryption")]
if summary.is_encrypted() { // if summary.is_encrypted() {
// TODO if the room isn't encrypted but the new summary is, // // TODO if the room isn't encrypted but the new summary is,
// add all the room users. // // add all the room users.
if let Some(o) = self.olm_machine().await { // if let Some(o) = self.olm_machine().await {
if let Some(users) = changes.joined_user_ids.get(&room_id) { // if let Some(users) = changes.joined_user_ids.get(&room_id) {
o.update_tracked_users(users).await // o.update_tracked_users(users).await
} // }
if let Some(users) = changes.invited_user_ids.get(&room_id) { // if let Some(users) = changes.invited_user_ids.get(&room_id) {
o.update_tracked_users(users).await // o.update_tracked_users(users).await
} // }
} // }
} // }
if room_info.timeline.limited {
summary.mark_members_missing();
}
let notification_count = room_info.unread_notifications.into(); let notification_count = room_info.unread_notifications.into();
summary.update_notification_count(notification_count); summary.update_notification_count(notification_count);
@ -670,17 +706,14 @@ impl BaseClient {
let mut summary = room.clone_summary(); let mut summary = room.clone_summary();
summary.mark_as_left(); summary.mark_as_left();
let state = self let (state, members, state_events) =
.handle_state( self.handle_state(room_info.state.events, &mut summary);
&room_id,
&room_info.state.events, changes.members.insert(room_id.clone(), members);
&mut summary, changes.state.insert(room_id.clone(), state_events);
&mut changes,
)
.await;
let timeline = self let timeline = self
.handle_timeline(&room_id, &room_info.timeline, &mut summary, &mut changes) .handle_timeline(&room_id, room_info.timeline, &mut summary, &mut changes)
.await; .await;
let account_data = self let account_data = self
@ -700,26 +733,14 @@ impl BaseClient {
changes.add_room(room_info); changes.add_room(room_info);
} }
let mut state = InviteState::default();
let room = self.get_or_create_stripped_room(&room_id).await; let room = self.get_or_create_stripped_room(&room_id).await;
let mut room_info = room.clone_summary(); let mut room_info = room.clone_summary();
for event in &invited.invite_state.events { let (state, members, state_events) =
if let Ok(e) = event.deserialize() { self.handle_invited_state(invited.invite_state.events, &mut room_info);
match &e {
AnyStrippedStateEvent::RoomMember(member) => {
changes.add_stripped_member(&room_id, member.clone());
}
_ => {
room_info.handle_state_event(&e);
changes.add_stripped_state_event(&room_id, e.clone());
}
}
state.events.push(e); changes.stripped_members.insert(room_id.clone(), members);
} changes.stripped_state.insert(room_id.clone(), state_events);
}
let room = InvitedRoom { let room = InvitedRoom {
invite_state: state, invite_state: state,
@ -728,22 +749,36 @@ impl BaseClient {
rooms.invite.insert(room_id, room); rooms.invite.insert(room_id, room);
} }
for event in &response.presence.events { let presence: BTreeMap<UserId, PresenceEvent> = response
if let Ok(e) = event.deserialize() { .presence
changes.add_presence_event(e); .events
} .into_iter()
} .filter_map(|e| {
let event = e.deserialize().ok()?;
Some((event.sender.clone(), event))
})
.collect();
for event in &response.account_data.events { changes.presence = presence;
if let Ok(e) = event.deserialize() {
changes.add_account_data(e); let account_data: BTreeMap<String, AnyBasicEvent> = response
} .account_data
} .events
.into_iter()
.filter_map(|e| {
let event = e.deserialize().ok()?;
Some((event.content().event_type().to_owned(), event))
})
.collect();
changes.account_data = account_data;
self.store.save_changes(&changes).await; self.store.save_changes(&changes).await;
*self.sync_token.write().await = Some(response.next_batch.clone()); *self.sync_token.write().await = Some(response.next_batch.clone());
self.apply_changes(&changes).await; self.apply_changes(&changes).await;
info!("Processed a sync response in {:?}", now.elapsed().unwrap());
Ok(SyncResponse { Ok(SyncResponse {
next_batch: response.next_batch, next_batch: response.next_batch,
rooms, rooms,
@ -784,34 +819,47 @@ impl BaseClient {
let mut changes = StateChanges::default(); let mut changes = StateChanges::default();
// TODO make sure we don't overwrite memership events from a sync. let members: BTreeMap<UserId, MemberEvent> =
for e in &response.chunk { futures::stream::iter(response.chunk.iter())
if let Ok(event) = hoist_member_event(e) { .map(|e| {
if let Ok(user_id) = UserId::try_from(event.state_key.as_str()) { hoist_member_event(e)
if self .ok()
.store .map(|e| MemberEvent::try_from(e).ok())
.get_member_event(room_id, &user_id) .flatten()
.await })
.is_none() .filter_map(|e| async {
{ if let Some(e) = e {
handle_membership(&mut changes, room_id, &event.into()); if self
.store
.get_member_event(room_id, &e.state_key)
.await
.is_some()
{
Some((e.state_key.clone(), e))
} else {
None
}
} else {
None
} }
} })
} .collect()
} .await;
#[cfg(feature = "encryption")] changes.members.insert(room_id.clone(), members);
if summary.is_encrypted() {
if let Some(o) = self.olm_machine().await {
if let Some(users) = changes.joined_user_ids.get(room_id) {
o.update_tracked_users(users).await
}
if let Some(users) = changes.invited_user_ids.get(room_id) { // #[cfg(feature = "encryption")]
o.update_tracked_users(users).await // if summary.is_encrypted() {
} // if let Some(o) = self.olm_machine().await {
} // if let Some(users) = changes.joined_user_ids.get(room_id) {
} // o.update_tracked_users(users).await
// }
// if let Some(users) = changes.invited_user_ids.get(room_id) {
// o.update_tracked_users(users).await
// }
// }
// }
changes.add_room(summary); changes.add_room(summary);

View File

@ -1,15 +1,16 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::{collections::BTreeMap, convert::TryFrom, time::SystemTime};
use matrix_sdk_common::{ use matrix_sdk_common::{
api::r0::sync::sync_events::{ api::r0::sync::sync_events::{
DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount, DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount,
}, },
events::{ events::{
presence::PresenceEvent, AnyBasicEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent,
AnySyncRoomEvent, AnySyncStateEvent, AnyToDeviceEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent,
AnyToDeviceEvent, StateEvent, StrippedStateEvent, SyncStateEvent, Unsigned,
}, },
identifiers::{DeviceKeyAlgorithm, RoomId}, identifiers::{DeviceKeyAlgorithm, EventId, RoomId, UserId},
}; };
#[derive(Clone, Debug, Default, Deserialize, Serialize)] #[derive(Clone, Debug, Default, Deserialize, Serialize)]
@ -204,3 +205,97 @@ pub struct State {
/// A list of state events. /// A list of state events.
pub events: Vec<AnySyncStateEvent>, pub events: Vec<AnySyncStateEvent>,
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(
try_from = "SyncStateEvent<MemberEventContent>",
into = "SyncStateEvent<MemberEventContent>"
)]
pub struct MemberEvent {
pub content: MemberEventContent,
pub event_id: EventId,
pub origin_server_ts: SystemTime,
pub prev_content: Option<MemberEventContent>,
pub sender: UserId,
pub state_key: UserId,
pub unsigned: Unsigned,
}
impl TryFrom<SyncStateEvent<MemberEventContent>> for MemberEvent {
type Error = matrix_sdk_common::identifiers::Error;
fn try_from(event: SyncStateEvent<MemberEventContent>) -> Result<Self, Self::Error> {
Ok(MemberEvent {
content: event.content,
event_id: event.event_id,
origin_server_ts: event.origin_server_ts,
prev_content: event.prev_content,
sender: event.sender,
state_key: UserId::try_from(event.state_key)?,
unsigned: event.unsigned,
})
}
}
impl TryFrom<StateEvent<MemberEventContent>> for MemberEvent {
type Error = matrix_sdk_common::identifiers::Error;
fn try_from(event: StateEvent<MemberEventContent>) -> Result<Self, Self::Error> {
Ok(MemberEvent {
content: event.content,
event_id: event.event_id,
origin_server_ts: event.origin_server_ts,
prev_content: event.prev_content,
sender: event.sender,
state_key: UserId::try_from(event.state_key)?,
unsigned: event.unsigned,
})
}
}
impl Into<SyncStateEvent<MemberEventContent>> for MemberEvent {
fn into(self) -> SyncStateEvent<MemberEventContent> {
SyncStateEvent {
content: self.content,
event_id: self.event_id,
sender: self.sender,
origin_server_ts: self.origin_server_ts,
state_key: self.state_key.to_string(),
prev_content: self.prev_content,
unsigned: self.unsigned,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(
try_from = "StrippedStateEvent<MemberEventContent>",
into = "StrippedStateEvent<MemberEventContent>"
)]
pub struct StrippedMemberEvent {
pub content: MemberEventContent,
pub sender: UserId,
pub state_key: UserId,
}
impl TryFrom<StrippedStateEvent<MemberEventContent>> for StrippedMemberEvent {
type Error = matrix_sdk_common::identifiers::Error;
fn try_from(event: StrippedStateEvent<MemberEventContent>) -> Result<Self, Self::Error> {
Ok(StrippedMemberEvent {
content: event.content,
sender: event.sender,
state_key: UserId::try_from(event.state_key)?,
})
}
}
impl Into<StrippedStateEvent<MemberEventContent>> for StrippedMemberEvent {
fn into(self) -> StrippedStateEvent<MemberEventContent> {
StrippedStateEvent {
content: self.content,
sender: self.sender,
state_key: self.state_key.to_string(),
}
}
}

View File

@ -367,8 +367,4 @@ impl RoomInfo {
changed changed
} }
pub fn serialize(&self) -> Vec<u8> {
serde_json::to_vec(&self).unwrap()
}
} }

View File

@ -12,22 +12,16 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::{ use std::sync::{Arc, Mutex as SyncMutex};
convert::TryFrom,
sync::{Arc, Mutex as SyncMutex},
};
use futures::stream::{Stream, StreamExt};
use matrix_sdk_common::{ use matrix_sdk_common::{
events::{room::encryption::EncryptionEventContent, AnyStrippedStateEvent, EventType}, events::{room::encryption::EncryptionEventContent, AnyStrippedStateEvent},
identifiers::{RoomAliasId, RoomId, UserId}, identifiers::{RoomAliasId, RoomId, UserId},
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::store::Store; use crate::store::Store;
use super::RoomMember;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StrippedRoom { pub struct StrippedRoom {
room_id: Arc<RoomId>, room_id: Arc<RoomId>,

View File

@ -1,10 +1,11 @@
use std::{collections::BTreeMap, convert::TryFrom, path::Path}; use std::{collections::BTreeMap, convert::TryFrom, path::Path, time::SystemTime};
use futures::stream::{self, Stream}; use futures::stream::{self, Stream};
use matrix_sdk_common::{ use matrix_sdk_common::{
events::{ events::{
presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent, presence::PresenceEvent,
AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType, StrippedStateEvent, room::member::{MemberEventContent, MembershipState},
AnyBasicEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType,
SyncStateEvent, SyncStateEvent,
}, },
identifiers::{RoomId, UserId}, identifiers::{RoomId, UserId},
@ -13,7 +14,11 @@ use matrix_sdk_common::{
use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree}; use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree};
use tracing::info; use tracing::info;
use crate::{rooms::RoomInfo, Session}; use crate::{
responses::{MemberEvent, StrippedMemberEvent},
rooms::RoomInfo,
Session,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Store { pub struct Store {
@ -35,58 +40,22 @@ pub struct Store {
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct StateChanges { pub struct StateChanges {
pub session: Option<Session>, pub session: Option<Session>,
pub members: BTreeMap<RoomId, BTreeMap<UserId, SyncStateEvent<MemberEventContent>>>, pub members: BTreeMap<RoomId, BTreeMap<UserId, MemberEvent>>,
pub state: BTreeMap<RoomId, BTreeMap<String, AnySyncStateEvent>>, pub state: BTreeMap<RoomId, BTreeMap<String, AnySyncStateEvent>>,
pub account_data: BTreeMap<String, AnyBasicEvent>, pub account_data: BTreeMap<String, AnyBasicEvent>,
pub room_account_data: BTreeMap<RoomId, BTreeMap<String, AnyBasicEvent>>, pub room_account_data: BTreeMap<RoomId, BTreeMap<String, AnyBasicEvent>>,
pub room_infos: BTreeMap<RoomId, RoomInfo>, pub room_infos: BTreeMap<RoomId, RoomInfo>,
pub joined_user_ids: BTreeMap<RoomId, Vec<UserId>>,
pub invited_user_ids: BTreeMap<RoomId, Vec<UserId>>,
pub removed_user_ids: BTreeMap<RoomId, UserId>,
pub presence: BTreeMap<UserId, PresenceEvent>, pub presence: BTreeMap<UserId, PresenceEvent>,
pub stripped_state: BTreeMap<RoomId, BTreeMap<String, AnyStrippedStateEvent>>, pub stripped_state: BTreeMap<RoomId, BTreeMap<String, AnyStrippedStateEvent>>,
pub stripped_members: pub stripped_members: BTreeMap<RoomId, BTreeMap<UserId, StrippedMemberEvent>>,
BTreeMap<RoomId, BTreeMap<UserId, StrippedStateEvent<MemberEventContent>>>,
pub invited_room_info: BTreeMap<RoomId, RoomInfo>, pub invited_room_info: BTreeMap<RoomId, RoomInfo>,
} }
impl StateChanges { impl StateChanges {
pub fn add_joined_member(
&mut self,
room_id: &RoomId,
event: SyncStateEvent<MemberEventContent>,
) {
let user_id = UserId::try_from(event.state_key.as_str()).unwrap();
self.joined_user_ids
.entry(room_id.to_owned())
.or_insert_with(Vec::new)
.push(user_id.clone());
self.members
.entry(room_id.to_owned())
.or_insert_with(BTreeMap::new)
.insert(user_id, event);
}
pub fn add_presence_event(&mut self, event: PresenceEvent) { pub fn add_presence_event(&mut self, event: PresenceEvent) {
self.presence.insert(event.sender.clone(), event); self.presence.insert(event.sender.clone(), event);
} }
pub fn add_invited_member(
&mut self,
room_id: &RoomId,
event: SyncStateEvent<MemberEventContent>,
) {
let user_id = UserId::try_from(event.state_key.as_str()).unwrap();
self.invited_user_ids
.entry(room_id.to_owned())
.or_insert_with(Vec::new)
.push(user_id.clone());
self.members
.entry(room_id.to_owned())
.or_insert_with(BTreeMap::new)
.insert(user_id, event);
}
pub fn add_room(&mut self, room: RoomInfo) { pub fn add_room(&mut self, room: RoomInfo) {
self.room_infos self.room_infos
.insert(room.room_id.as_ref().to_owned(), room); .insert(room.room_id.as_ref().to_owned(), room);
@ -111,11 +80,7 @@ impl StateChanges {
.insert(event.state_key().to_string(), event); .insert(event.state_key().to_string(), event);
} }
pub fn add_stripped_member( pub fn add_stripped_member(&mut self, room_id: &RoomId, event: StrippedMemberEvent) {
&mut self,
room_id: &RoomId,
event: StrippedStateEvent<MemberEventContent>,
) {
let user_id = UserId::try_from(event.state_key.as_str()).unwrap(); let user_id = UserId::try_from(event.state_key.as_str()).unwrap();
self.stripped_members self.stripped_members
.entry(room_id.to_owned()) .entry(room_id.to_owned())
@ -129,13 +94,6 @@ impl StateChanges {
.or_insert_with(BTreeMap::new) .or_insert_with(BTreeMap::new)
.insert(event.content().event_type().to_string(), event); .insert(event.content().event_type().to_string(), event);
} }
pub fn from_event(room_id: &RoomId, event: SyncStateEvent<MemberEventContent>) -> Self {
let mut changes = Self::default();
changes.add_joined_member(room_id, event);
changes
}
} }
impl From<Session> for StateChanges { impl From<Session> for StateChanges {
@ -209,6 +167,8 @@ impl Store {
} }
pub async fn save_changes(&self, changes: &StateChanges) { pub async fn save_changes(&self, changes: &StateChanges) {
let now = SystemTime::now();
let ret: TransactionResult<()> = ( let ret: TransactionResult<()> = (
&self.session, &self.session,
&self.account_data, &self.account_data,
@ -244,6 +204,23 @@ impl Store {
for (room, events) in &changes.members { for (room, events) in &changes.members {
for (_, event) in events { for (_, event) in events {
let key = format!("{}{}", room.as_str(), event.state_key.as_str());
match event.content.membership {
MembershipState::Join => {
joined.insert(key.as_str(), event.state_key.as_str())?;
invited.remove(key.as_str())?;
}
MembershipState::Invite => {
invited.insert(key.as_str(), event.state_key.as_str())?;
joined.remove(key.as_str())?;
}
_ => {
joined.remove(key.as_str())?;
invited.remove(key.as_str())?;
}
}
members.insert( members.insert(
format!("{}{}", room.as_str(), &event.state_key).as_str(), format!("{}{}", room.as_str(), &event.state_key).as_str(),
serde_json::to_vec(&event).unwrap(), serde_json::to_vec(&event).unwrap(),
@ -265,24 +242,6 @@ impl Store {
} }
} }
for (room, users) in &changes.joined_user_ids {
for user in users {
let key = format!("{}{}", room.as_str(), user.as_str());
info!("SAVING joined {}", &key);
joined.insert(key.as_bytes(), user.as_bytes())?;
invited.remove(key.as_bytes())?;
}
}
for (room, users) in &changes.invited_user_ids {
for user in users {
let key = format!("{}{}", room.as_str(), user.as_str());
info!("SAVING invited {}", &key);
invited.insert(key.as_bytes(), user.as_bytes())?;
joined.remove(key.as_bytes())?;
}
}
for (room, events) in &changes.state { for (room, events) in &changes.state {
for (_, event) in events { for (_, event) in events {
state.insert( state.insert(
@ -299,7 +258,8 @@ impl Store {
} }
for (room_id, summary) in &changes.room_infos { for (room_id, summary) in &changes.room_infos {
summaries.insert(room_id.as_bytes(), summary.serialize())?; summaries
.insert(room_id.as_bytes(), serde_json::to_vec(summary).unwrap())?;
} }
for (sender, event) in &changes.presence { for (sender, event) in &changes.presence {
@ -342,6 +302,8 @@ impl Store {
ret.unwrap(); ret.unwrap();
self.inner.flush_async().await.unwrap(); self.inner.flush_async().await.unwrap();
info!("Saved changes in {:?}", now.elapsed().unwrap());
} }
pub async fn get_presence_event(&self, user_id: &UserId) -> Option<PresenceEvent> { pub async fn get_presence_event(&self, user_id: &UserId) -> Option<PresenceEvent> {