base: Refactor out the room state/timeline handling.

master
Damir Jelić 2020-11-30 17:25:29 +01:00
parent 7dd834a214
commit 0e563a9a81
4 changed files with 99 additions and 90 deletions

View File

@ -458,7 +458,7 @@ impl Client {
///
/// `room_id` - The unique id of the room that should be fetched.
pub fn get_joined_room(&self, room_id: &RoomId) -> Option<Room> {
self.base_client.get_joined_room(room_id)
self.base_client.get_room(room_id)
}
///// Get an invited room with the given room id.
@ -1032,7 +1032,7 @@ impl Client {
let _guard = mutex.lock().await;
{
let room = self.base_client.get_joined_room(room_id).unwrap();
let room = self.base_client.get_room(room_id).unwrap();
let members = room.joined_user_ids().await;
// TODO don't collect here.
let members_iter: Vec<UserId> = members.collect().await;
@ -1134,14 +1134,14 @@ impl Client {
/// Returns true if a room with the given id was found and the room is
/// encrypted, false if the room wasn't found or isn't encrypted.
async fn is_room_encrypted(&self, room_id: &RoomId) -> bool {
match self.base_client.get_joined_room(room_id) {
match self.base_client.get_room(room_id) {
Some(r) => r.is_encrypted(),
None => false,
}
}
async fn are_members_synced(&self, room_id: &RoomId) -> bool {
match self.base_client.get_joined_room(room_id) {
match self.base_client.get_room(room_id) {
Some(r) => r.are_members_synced(),
None => true,
}

View File

@ -56,7 +56,7 @@ use crate::{
error::Result,
responses::{JoinedRoom, Rooms, State, SyncResponse, Timeline},
session::Session,
store::{Room, RoomType, StateChanges, Store},
store::{InnerSummary, Room, RoomType, StateChanges, Store},
};
pub type Token = String;
@ -212,7 +212,7 @@ pub struct BaseClient {
pub(crate) sync_token: Arc<RwLock<Option<Token>>>,
/// Database
store: Store,
joined_rooms: Arc<DashMap<RoomId, Room>>,
rooms: Arc<DashMap<RoomId, Room>>,
#[cfg(feature = "encryption")]
olm: Arc<Mutex<Option<OlmMachine>>>,
#[cfg(feature = "encryption")]
@ -326,7 +326,7 @@ impl BaseClient {
session: Arc::new(RwLock::new(None)),
sync_token: Arc::new(RwLock::new(None)),
store,
joined_rooms: Arc::new(DashMap::new()),
rooms: Arc::new(DashMap::new()),
#[cfg(feature = "encryption")]
olm: Arc::new(Mutex::new(None)),
#[cfg(feature = "encryption")]
@ -446,80 +446,22 @@ impl BaseClient {
.expect("Creating room while not being logged in")
.user_id;
match room_type {
RoomType::Joined => self
.joined_rooms
self.rooms
.entry(room_id.clone())
.or_insert_with(|| Room::new(user_id, self.store.clone(), room_id, room_type))
.clone(),
_ => todo!(),
}
.clone()
}
/// Receive a response from a sync call.
///
/// # Arguments
///
/// * `response` - The response that we received after a successful sync.
pub async fn receive_sync_response(
async fn handle_timeline(
&self,
mut response: api::sync::sync_events::Response,
) -> Result<SyncResponse> {
// The server might respond multiple times with the same sync token, in
// that case we already received this response and there's nothing to
// do.
if self.sync_token.read().await.as_ref() == Some(&response.next_batch) {
return Ok(SyncResponse::new_empty(response.next_batch.clone()));
}
room_id: &RoomId,
ruma_timeline: &api::sync::sync_events::Timeline,
summary: &mut InnerSummary,
mut changes: &mut StateChanges,
) -> Timeline {
let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone());
#[cfg(feature = "encryption")]
{
let olm = self.olm.lock().await;
if let Some(o) = &*olm {
// Let the crypto machine handle the sync response, this
// decryptes to-device events, but leaves room events alone.
// This makes sure that we have the deryption keys for the room
// events at hand.
o.receive_sync_response(&mut response).await?;
}
}
let mut changes = StateChanges::default();
let mut rooms = Rooms::default();
for (room_id, room_info) in &response.rooms.join {
let room = self.get_or_create_room(room_id, RoomType::Joined).await;
let mut summary = room.clone_summary();
summary.update(&room_info.summary);
summary.set_prev_batch(room_info.timeline.prev_batch.as_deref());
let mut state = State::default();
for e in &room_info.state.events {
if let Ok(event) = hoist_and_deserialize_state_event(e) {
match &event {
AnySyncStateEvent::RoomMember(member) => {
handle_membership(&mut changes, room_id, member);
}
e => {
summary.handle_state_event(&e);
changes.add_state_event(room_id, e.clone());
}
}
state.events.push(event);
}
}
let mut timeline = Timeline::new(
room_info.timeline.limited,
room_info.timeline.prev_batch.clone(),
);
for event in &room_info.timeline.events {
for event in &ruma_timeline.events {
if let Ok(mut e) = hoist_room_event_prev_content(event) {
match &mut e {
AnySyncRoomEvent::State(s) => match s {
@ -553,6 +495,89 @@ impl BaseClient {
}
}
timeline
}
async fn handle_state(
&self,
room_id: &RoomId,
events: &[Raw<AnySyncStateEvent>],
summary: &mut InnerSummary,
mut changes: &mut StateChanges,
) -> State {
let mut state = State::default();
for e in events {
if let Ok(event) = hoist_and_deserialize_state_event(e) {
match &event {
AnySyncStateEvent::RoomMember(member) => {
handle_membership(&mut changes, room_id, member);
}
e => {
summary.handle_state_event(&e);
changes.add_state_event(room_id, e.clone());
}
}
state.events.push(event);
}
}
state
}
/// Receive a response from a sync call.
///
/// # Arguments
///
/// * `response` - The response that we received after a successful sync.
pub async fn receive_sync_response(
&self,
mut response: api::sync::sync_events::Response,
) -> Result<SyncResponse> {
// The server might respond multiple times with the same sync token, in
// that case we already received this response and there's nothing to
// do.
if self.sync_token.read().await.as_ref() == Some(&response.next_batch) {
return Ok(SyncResponse::new_empty(response.next_batch.clone()));
}
#[cfg(feature = "encryption")]
{
let olm = self.olm.lock().await;
if let Some(o) = &*olm {
// Let the crypto machine handle the sync response, this
// decryptes to-device events, but leaves room events alone.
// This makes sure that we have the deryption keys for the room
// events at hand.
o.receive_sync_response(&mut response).await?;
}
}
let mut changes = StateChanges::default();
let mut rooms = Rooms::default();
for (room_id, room_info) in &response.rooms.join {
let room = self.get_or_create_room(room_id, RoomType::Joined).await;
let mut summary = room.clone_summary();
summary.update(&room_info.summary);
summary.set_prev_batch(room_info.timeline.prev_batch.as_deref());
let state = self
.handle_state(
&room_id,
&room_info.state.events,
&mut summary,
&mut changes,
)
.await;
let timeline = self
.handle_timeline(&room_id, &room_info.timeline, &mut summary, &mut changes)
.await;
#[cfg(feature = "encryption")]
if summary.is_encrypted() {
// TODO if the room isn't encrypted but the new summary is,
@ -595,7 +620,7 @@ impl BaseClient {
async fn apply_changes(&self, changes: &StateChanges) {
// TODO emit room changes here
for (room_id, summary) in &changes.room_summaries {
if let Some(room) = self.get_joined_room(&room_id) {
if let Some(room) = self.get_room(&room_id) {
room.update_summary(summary.clone())
}
}
@ -606,7 +631,7 @@ impl BaseClient {
room_id: &RoomId,
response: &api::membership::get_member_events::Response,
) -> Result<()> {
if let Some(room) = self.get_joined_room(room_id) {
if let Some(room) = self.get_room(room_id) {
let mut summary = room.clone_summary();
summary.mark_members_synced();
@ -763,8 +788,8 @@ impl BaseClient {
}
}
pub fn get_joined_room(&self, room_id: &RoomId) -> Option<Room> {
self.joined_rooms.get(room_id).map(|r| r.clone())
pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
self.rooms.get(room_id).map(|r| r.clone())
}
/// Encrypt a message event content.

View File

@ -18,7 +18,6 @@ pub struct SyncResponse {
/// Updates to the presence status of other users.
pub presence: Presence,
///// The global private data created by this user.
//#[serde(default, skip_serializing_if = "AccountData::is_empty")]
//pub account_data: AccountData,
/// Messages sent dirrectly between devices.
pub to_device: ToDevice,
@ -74,41 +73,28 @@ pub struct ToDevice {
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Rooms {
// /// The rooms that the user has left or been banned from.
// #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
// pub leave: BTreeMap<RoomId, LeftRoom>,
/// The rooms that the user has joined.
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub join: BTreeMap<RoomId, JoinedRoom>,
// /// The rooms that the user has been invited to.
// #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
// pub invite: BTreeMap<RoomId, InvitedRoom>,
}
/// Updates to joined rooms.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JoinedRoom {
// /// Information about the room which clients may need to correctly render it
// /// to users.
// #[serde(default, skip_serializing_if = "RoomSummary::is_empty")]
// pub summary: RoomSummary,
// /// Counts of unread notifications for this room.
// #[serde(default, skip_serializing_if = "UnreadNotificationsCount::is_empty")]
// pub unread_notifications: UnreadNotificationsCount,
/// The timeline of messages and state changes in the room.
pub timeline: Timeline,
/// Updates to the state, between the time indicated by the `since` parameter, and the start
/// of the `timeline` (or all state up to the start of the `timeline`, if `since` is not
/// given, or `full_state` is true).
pub state: State,
// /// The private data that this user has attached to this room.
// #[serde(default, skip_serializing_if = "AccountData::is_empty")]
// pub account_data: AccountData,
// /// The ephemeral events in the room that aren't recorded in the timeline or state of the
// /// room. e.g. typing.
// #[serde(default, skip_serializing_if = "Ephemeral::is_empty")]
// pub ephemeral: Ephemeral,
}

View File

@ -139,8 +139,6 @@ pub enum RoomType {
Joined,
/// Represents a left room, the `left_rooms` HashMap will be used.
Left,
/// Represents an invited room, the `invited_rooms` HashMap will be used.
Invited,
}
impl Room {