event_emitter/async_client: receive and emit events for invited and left rooms

master
Devin R 2020-05-05 16:13:14 -04:00
parent 7b6e030823
commit 0e538a7c67
4 changed files with 253 additions and 18 deletions

View File

@ -645,6 +645,22 @@ impl AsyncClient {
let mut response = self.send(request).await?;
// when events change state updated signals to state store to update database
let mut updated = self.iter_joined_rooms(&mut response).await?;
if self.iter_invited_rooms(&mut response).await? {
updated = true;
}
if self.iter_left_rooms(&mut response).await? {
updated = true;
}
let mut client = self.base_client.write().await;
client.receive_sync_response(&mut response, updated).await?;
Ok(response)
}
async fn iter_joined_rooms(&self, response: &mut sync_events::Response) -> Result<bool> {
let mut updated = false;
for (room_id, room) in &mut response.rooms.join {
let matrix_room = {
@ -745,11 +761,92 @@ impl AsyncClient {
}
}
}
Ok(updated)
}
async fn iter_left_rooms(&self, response: &mut sync_events::Response) -> Result<bool> {
let mut updated = false;
for (room_id, left_room) in &mut response.rooms.leave {
let matrix_room = {
let mut client = self.base_client.write().await;
client.receive_sync_response(&mut response, updated).await?;
for mut event in &mut left_room.timeline.events {
let decrypted_event = {
let mut client = self.base_client.write().await;
let (decrypt_ev, timeline_update) = client
.receive_joined_timeline_event(room_id, &mut event)
.await;
if timeline_update {
updated = true;
};
decrypt_ev
};
Ok(response)
if let Some(e) = decrypted_event {
*event = e;
}
if let Ok(e) = event.deserialize() {
let client = self.base_client.read().await;
client.emit_timeline_event(&room_id, &e).await;
}
}
client.get_or_create_room(&room_id).clone()
};
// re looping is not ideal here
for event in &mut left_room.state.events {
if let Ok(e) = event.deserialize() {
let client = self.base_client.read().await;
client.emit_state_event(&room_id, &e).await;
}
}
if updated {
if let Some(store) = self.base_client.read().await.state_store.as_ref() {
store
.store_room_state(matrix_room.read().await.deref())
.await?;
}
}
}
Ok(updated)
}
async fn iter_invited_rooms(&self, response: &mut sync_events::Response) -> Result<bool> {
let mut updated = false;
// INVITED ROOMS
for (room_id, invited_room) in &mut response.rooms.invite {
let matrix_room = {
let mut client = self.base_client.write().await;
for event in &invited_room.invite_state.events {
if let Ok(e) = event.deserialize() {
if client.receive_invite_state_event(&room_id, &e).await {
updated = true;
}
}
}
client.get_or_create_room(&room_id).clone()
};
// re looping is not ideal here
for event in &mut invited_room.invite_state.events {
if let Ok(e) = event.deserialize() {
let client = self.base_client.read().await;
client.emit_stripped_state_event(&room_id, &e).await;
}
}
if updated {
if let Some(store) = self.base_client.read().await.state_store.as_ref() {
store
.store_room_state(matrix_room.read().await.deref())
.await?;
}
}
}
Ok(updated)
}
/// Repeatedly call sync to synchronize the client state with the server.

View File

@ -30,6 +30,7 @@ use crate::events::presence::PresenceEvent;
use crate::events::collections::only::Event as NonRoomEvent;
use crate::events::ignored_user_list::IgnoredUserListEvent;
use crate::events::push_rules::{PushRulesEvent, Ruleset};
use crate::events::stripped::AnyStrippedStateEvent;
use crate::events::EventJson;
use crate::identifiers::{RoomId, UserId};
use crate::models::Room;
@ -333,6 +334,25 @@ impl Client {
room.receive_state_event(event)
}
/// Receive a state event for a room the user has been invited to.
///
/// Returns true if the state of the room changed, false
/// otherwise.
///
/// # Arguments
///
/// * `room_id` - The unique id of the room the event belongs to.
///
/// * `event` - A `AnyStrippedStateEvent` that should be handled by the client.
pub async fn receive_invite_state_event(
&mut self,
room_id: &RoomId,
event: &AnyStrippedStateEvent,
) -> bool {
let mut room = self.get_or_create_room(room_id).write().await;
room.receive_stripped_state_event(event)
}
/// Receive a presence event from a sync response and updates the client state.
///
/// Returns true if the membership list of the room changed, false
@ -707,6 +727,71 @@ impl Client {
}
}
pub(crate) async fn emit_stripped_state_event(
&self,
room_id: &RoomId,
event: &AnyStrippedStateEvent,
) {
match event {
AnyStrippedStateEvent::RoomMember(member) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) {
ee.on_stripped_state_member(Arc::clone(&room), &member)
.await;
}
}
}
AnyStrippedStateEvent::RoomName(name) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) {
ee.on_stripped_state_name(Arc::clone(&room), &name).await;
}
}
}
AnyStrippedStateEvent::RoomCanonicalAlias(canonical) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) {
ee.on_stripped_state_canonical_alias(Arc::clone(&room), &canonical)
.await;
}
}
}
AnyStrippedStateEvent::RoomAliases(aliases) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) {
ee.on_stripped_state_aliases(Arc::clone(&room), &aliases)
.await;
}
}
}
AnyStrippedStateEvent::RoomAvatar(avatar) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) {
ee.on_stripped_state_avatar(Arc::clone(&room), &avatar)
.await;
}
}
}
AnyStrippedStateEvent::RoomPowerLevels(power) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) {
ee.on_stripped_state_power_levels(Arc::clone(&room), &power)
.await;
}
}
}
AnyStrippedStateEvent::RoomJoinRules(rules) => {
if let Some(ee) = &self.event_emitter {
if let Some(room) = self.get_room(&room_id) {
ee.on_stripped_state_join_rules(Arc::clone(&room), &rules)
.await;
}
}
}
_ => {}
}
}
pub(crate) async fn emit_state_event(&self, room_id: &RoomId, event: &StateEvent) {
match event {
StateEvent::RoomMember(member) => {

View File

@ -19,6 +19,7 @@ use crate::events::{
ignored_user_list::IgnoredUserListEvent,
presence::PresenceEvent,
push_rules::PushRulesEvent,
receipt::ReceiptEvent,
room::{
aliases::AliasesEvent,
avatar::AvatarEvent,
@ -31,6 +32,11 @@ use crate::events::{
redaction::RedactionEvent,
tombstone::TombstoneEvent,
},
stripped::{
StrippedRoomAliases, StrippedRoomAvatar, StrippedRoomCanonicalAlias, StrippedRoomJoinRules,
StrippedRoomMember, StrippedRoomName, StrippedRoomPowerLevels,
},
typing::TypingEvent,
};
use crate::models::Room;
use tokio::sync::RwLock;
@ -101,6 +107,10 @@ pub trait EventEmitter: Send + Sync {
async fn on_room_power_levels(&self, _: Arc<RwLock<Room>>, _: &PowerLevelsEvent) {}
/// Fires when `AsyncClient` receives a `RoomEvent::Tombstone` event.
async fn on_room_tombstone(&self, _: Arc<RwLock<Room>>, _: &TombstoneEvent) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::Typing` event.
async fn on_account_data_typing(&self, _: Arc<RwLock<Room>>, _: &TypingEvent) {}
/// Fires when `AsyncClient` receives a `NonRoomEvent::Typing` event.
async fn on_account_data_receipt(&self, _: Arc<RwLock<Room>>, _: &ReceiptEvent) {}
// `RoomEvent`s from `IncomingState`
/// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event.
@ -118,6 +128,32 @@ pub trait EventEmitter: Send + Sync {
/// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event.
async fn on_state_join_rules(&self, _: Arc<RwLock<Room>>, _: &JoinRulesEvent) {}
// `AnyStrippedStateEvent`s
/// Fires when `AsyncClient` receives a `StateEvent::RoomMember` event.
async fn on_stripped_state_member(&self, _: Arc<RwLock<Room>>, _: &StrippedRoomMember) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomName` event.
async fn on_stripped_state_name(&self, _: Arc<RwLock<Room>>, _: &StrippedRoomName) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomCanonicalAlias` event.
async fn on_stripped_state_canonical_alias(
&self,
_: Arc<RwLock<Room>>,
_: &StrippedRoomCanonicalAlias,
) {
}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAliases` event.
async fn on_stripped_state_aliases(&self, _: Arc<RwLock<Room>>, _: &StrippedRoomAliases) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomAvatar` event.
async fn on_stripped_state_avatar(&self, _: Arc<RwLock<Room>>, _: &StrippedRoomAvatar) {}
/// Fires when `AsyncClient` receives a `StateEvent::RoomPowerLevels` event.
async fn on_stripped_state_power_levels(
&self,
_: Arc<RwLock<Room>>,
_: &StrippedRoomPowerLevels,
) {
}
/// Fires when `AsyncClient` receives a `StateEvent::RoomJoinRules` event.
async fn on_stripped_state_join_rules(&self, _: Arc<RwLock<Room>>, _: &StrippedRoomJoinRules) {}
// `NonRoomEvent` (this is a type alias from ruma_events) from `IncomingAccountData`
/// Fires when `AsyncClient` receives a `NonRoomEvent::RoomMember` event.
async fn on_account_presence(&self, _: Arc<RwLock<Room>>, _: &PresenceEvent) {}

View File

@ -30,6 +30,7 @@ use crate::events::room::{
power_levels::{NotificationPowerLevels, PowerLevelsEvent, PowerLevelsEventContent},
tombstone::TombstoneEvent,
};
use crate::events::stripped::AnyStrippedStateEvent;
use crate::events::EventType;
use crate::identifiers::{RoomAliasId, RoomId, UserId};
@ -183,7 +184,7 @@ impl RoomName {
})
.collect::<Vec<String>>();
names.sort();
// TODO what is the length the spec wants us to use here and in the `else`
// TODO what length does the spec want us to use here and in the `else`
format!("{}, and {} others", names.join(", "), (joined + invited))
} else {
format!("Empty Room (was {} others)", members.len())
@ -396,15 +397,15 @@ impl Room {
pub fn receive_timeline_event(&mut self, event: &RoomEvent) -> bool {
match event {
// update to the current members of the room
RoomEvent::RoomMember(m) => self.handle_membership(m),
RoomEvent::RoomMember(member) => self.handle_membership(member),
// finds all events related to the name of the room for later use
RoomEvent::RoomName(n) => self.handle_room_name(n),
RoomEvent::RoomCanonicalAlias(ca) => self.handle_canonical(ca),
RoomEvent::RoomAliases(a) => self.handle_room_aliases(a),
RoomEvent::RoomName(name) => self.handle_room_name(name),
RoomEvent::RoomCanonicalAlias(c_alias) => self.handle_canonical(c_alias),
RoomEvent::RoomAliases(alias) => self.handle_room_aliases(alias),
// power levels of the room members
RoomEvent::RoomPowerLevels(p) => self.handle_power_level(p),
RoomEvent::RoomTombstone(t) => self.handle_tombstone(t),
RoomEvent::RoomEncryption(e) => self.handle_encryption_event(e),
RoomEvent::RoomPowerLevels(power) => self.handle_power_level(power),
RoomEvent::RoomTombstone(tomb) => self.handle_tombstone(tomb),
RoomEvent::RoomEncryption(encrypt) => self.handle_encryption_event(encrypt),
_ => false,
}
}
@ -418,17 +419,33 @@ impl Room {
/// * `event` - The event of the room.
pub fn receive_state_event(&mut self, event: &StateEvent) -> bool {
match event {
StateEvent::RoomMember(m) => self.handle_membership(m),
StateEvent::RoomName(n) => self.handle_room_name(n),
StateEvent::RoomCanonicalAlias(ca) => self.handle_canonical(ca),
StateEvent::RoomAliases(a) => self.handle_room_aliases(a),
StateEvent::RoomPowerLevels(p) => self.handle_power_level(p),
StateEvent::RoomTombstone(t) => self.handle_tombstone(t),
StateEvent::RoomEncryption(e) => self.handle_encryption_event(e),
// update to the current members of the room
StateEvent::RoomMember(member) => self.handle_membership(member),
// finds all events related to the name of the room for later use
StateEvent::RoomName(name) => self.handle_room_name(name),
StateEvent::RoomCanonicalAlias(c_alias) => self.handle_canonical(c_alias),
StateEvent::RoomAliases(alias) => self.handle_room_aliases(alias),
// power levels of the room members
StateEvent::RoomPowerLevels(power) => self.handle_power_level(power),
StateEvent::RoomTombstone(tomb) => self.handle_tombstone(tomb),
StateEvent::RoomEncryption(encrypt) => self.handle_encryption_event(encrypt),
_ => false,
}
}
/// Receive a stripped state event for this room and update the room state.
///
/// Returns true if the state of the `Room` has changed, false otherwise.
///
/// # Arguments
///
/// * `event` - The `AnyStrippedStateEvent` sent by the server for invited but not
/// joined rooms.
pub fn receive_stripped_state_event(&mut self, _event: &AnyStrippedStateEvent) -> bool {
// TODO do we want to do anything with the events from an invited room?
true
}
/// Receive a presence event from an `IncomingResponse` and updates the client state.
///
/// This will only update the user if found in the current room looped through by `AsyncClient::sync`.