base: WIP lazy loading support.

This commit is contained in:
Damir Jelić 2020-11-20 17:20:07 +01:00
parent 3da1d3cf8f
commit 9edf8657d0
3 changed files with 96 additions and 25 deletions

View file

@ -69,7 +69,7 @@ use matrix_sdk_common::{
directory::{get_public_rooms, get_public_rooms_filtered},
media::create_content,
membership::{
ban_user, forget_room,
ban_user, forget_room, get_member_events,
invite_user::{self, InvitationRecipient},
join_room_by_id, join_room_by_id_or_alias, kick_user, leave_room, Invite3pid,
},
@ -1088,6 +1088,11 @@ impl Client {
#[cfg(feature = "encryption")]
let content = if self.is_room_encrypted(room_id).await {
if !self.are_members_synced(room_id).await {
self.room_members(room_id).await?;
// TODO query keys here?
}
self.preshare_group_session(room_id).await?;
AnyMessageEventContent::RoomEncrypted(self.base_client.encrypt(room_id, content).await?)
} else {
@ -1112,6 +1117,13 @@ impl Client {
}
}
async fn are_members_synced(&self, room_id: &RoomId) -> bool {
match self.base_client.get_joined_room(room_id) {
Some(r) => r.is_encrypted(),
None => true,
}
}
/// Send an attachment to a room.
///
/// This will upload the given data that the reader produces using the
@ -1434,6 +1446,16 @@ impl Client {
self.send(request).await
}
#[allow(dead_code)]
async fn room_members(&self, room_id: &RoomId) -> Result<()> {
let request = get_member_events::Request::new(room_id);
let response = self.send(request).await?;
self.base_client.receive_members(room_id, &response).await?;
Ok(())
}
/// Synchronize the client's state with the latest state on the server.
///
/// **Note**: You should not use this method to repeatedly sync if encryption

View file

@ -14,6 +14,7 @@
// limitations under the License.
use std::{
convert::TryFrom,
fmt,
path::{Path, PathBuf},
result::Result as StdResult,
@ -29,7 +30,7 @@ use matrix_sdk_common::{
api::r0 as api,
events::{
room::member::MemberEventContent, AnyStrippedStateEvent, AnySyncRoomEvent,
AnySyncStateEvent, SyncStateEvent,
AnySyncStateEvent, StateEvent, SyncStateEvent,
},
identifiers::{RoomId, UserId},
locks::RwLock,
@ -103,6 +104,22 @@ pub fn hoist_and_deserialize_state_event(
Ok(ev)
}
fn hoist_member_event(
event: &Raw<StateEvent<MemberEventContent>>,
) -> StdResult<StateEvent<MemberEventContent>, serde_json::Error> {
let prev_content = serde_json::from_str::<AdditionalEventData>(event.json().get())?
.unsigned
.prev_content;
let mut e = event.deserialize()?;
if e.prev_content.is_none() {
e.prev_content = prev_content.map(|e| e.deserialize().ok()).flatten();
}
Ok(e)
}
fn hoist_room_event_prev_content(
event: &Raw<AnySyncRoomEvent>,
) -> StdResult<AnySyncRoomEvent, serde_json::Error> {
@ -132,6 +149,27 @@ fn stripped_deserialize_prev_content(
.ok()
}
fn handle_membership(
changes: &mut StateChanges,
room_id: &RoomId,
event: SyncStateEvent<MemberEventContent>,
) {
use matrix_sdk_common::events::room::member::MembershipState::*;
match &event.content.membership {
Join => {
info!("ADDING MEMBER {} to {}", event.state_key, room_id);
changes.add_joined_member(room_id, event)
// TODO check if the display name is
// ambigous
}
Invite => {
info!("ADDING INVITED MEMBER {} to {}", event.state_key, room_id);
changes.add_invited_member(room_id, event)
}
_ => info!("UNHANDLED MEMBERSHIP"),
}
}
/// Signals to the `BaseClient` which `RoomState` to send to `EventEmitter`.
#[derive(Debug)]
pub enum RoomStateType {
@ -449,29 +487,6 @@ impl BaseClient {
// apply and emit the new events and rooms.
let mut changes = StateChanges::default();
let handle_membership =
|changes: &mut StateChanges, room_id, event: SyncStateEvent<MemberEventContent>| {
// let member_id = UserId::try_from(member.state_key.clone()).unwrap();
// self.store.get_member_event(room_id, &member_id).await;
// TODO this isn't right, check the diff against
// your previous state.
use matrix_sdk_common::events::room::member::MembershipState::*;
match &event.content.membership {
Join => {
info!("ADDING MEMBER {} to {}", event.state_key, room_id);
changes.add_joined_member(room_id, event)
// TODO check if the display name is
// ambigous
}
Invite => {
info!("ADDING INVITED MEMBER {} to {}", event.state_key, room_id);
changes.add_invited_member(room_id, event)
}
_ => info!("UNHANDLED MEMBERSHIP"),
}
};
for (room_id, room_info) in &response.rooms.join {
let room = self.get_or_create_room(room_id, RoomType::Joined).await;
@ -543,6 +558,36 @@ impl BaseClient {
Ok(())
}
pub async fn receive_members(
&self,
room_id: &RoomId,
response: &api::membership::get_member_events::Response,
) -> Result<()> {
if self.get_joined_room(room_id).is_some() {
let mut changes = StateChanges::default();
// TODO make sure we don't overwrite memership events from a sync.
for e in &response.chunk {
if let Ok(event) = hoist_member_event(e) {
if let Ok(user_id) = UserId::try_from(event.state_key.as_str()) {
if self
.store
.get_member_event(room_id, &user_id)
.await
.is_none()
{
handle_membership(&mut changes, room_id, event.into());
}
}
}
}
self.store.save_changes(&changes).await;
}
Ok(())
}
/// Should the client share a group session for the given room.
///
/// Returns true if a session needs to be shared before room messages can be

View file

@ -158,6 +158,10 @@ impl Room {
}
}
pub async fn are_members_synced(&self) -> bool {
true
}
pub async fn get_j_members(&self) -> impl Stream<Item = RoomMember> + '_ {
let joined = self.store.get_joined_user_ids(self.room_id()).await;
let invited = self.store.get_invited_user_ids(self.room_id()).await;