From 3f2c5d22b6edba210a8495605f839387032cb55f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Commaille?= Date: Mon, 12 Apr 2021 11:09:39 +0200 Subject: [PATCH] matrix-sdk: Get notifications locally on sync --- matrix_sdk/src/event_handler/mod.rs | 78 +++++- matrix_sdk_base/src/client.rs | 231 +++++++++++++++--- matrix_sdk_base/src/rooms/normal.rs | 5 + matrix_sdk_base/src/store/memory_store.rs | 11 + matrix_sdk_base/src/store/mod.rs | 11 + matrix_sdk_base/src/store/sled_store/mod.rs | 17 +- .../src/deserialized_responses.rs | 9 +- matrix_sdk_test/src/test_json/sync.rs | 19 +- 8 files changed, 340 insertions(+), 41 deletions(-) diff --git a/matrix_sdk/src/event_handler/mod.rs b/matrix_sdk/src/event_handler/mod.rs index fb2b485d..17d706b3 100644 --- a/matrix_sdk/src/event_handler/mod.rs +++ b/matrix_sdk/src/event_handler/mod.rs @@ -14,7 +14,9 @@ // limitations under the License. use std::ops::Deref; -use matrix_sdk_common::{events::AnySyncRoomEvent, identifiers::RoomId}; +use matrix_sdk_common::{ + api::r0::push::get_notifications::Notification, events::AnySyncRoomEvent, identifiers::RoomId, +}; use serde_json::value::RawValue as RawJsonValue; use crate::{ @@ -118,6 +120,15 @@ impl Handler { for event in &response.presence.events { self.on_presence_event(event).await; } + + for (room_id, notifications) in &response.notifications { + if let Some(room) = self.get_room(&room_id) { + for notification in notifications { + self.on_room_notification(room.clone(), notification.clone()) + .await; + } + } + } } async fn handle_timeline_event(&self, room: Room, event: &AnySyncRoomEvent) { @@ -348,6 +359,10 @@ pub trait EventHandler: Send + Sync { /// Fires when `Client` receives a `RoomEvent::Tombstone` event. async fn on_room_tombstone(&self, _: Room, _: &SyncStateEvent) {} + /// Fires when `Client` receives room events that trigger notifications according to + /// the push rules of the user. + async fn on_room_notification(&self, _: Room, _: Notification) {} + // `RoomEvent`s from `IncomingState` /// Fires when `Client` receives a `StateEvent::RoomMember` event. async fn on_state_member(&self, _: Room, _: &SyncStateEvent) {} @@ -667,6 +682,9 @@ mod test { async fn on_custom_event(&self, _: Room, _: &CustomEvent<'_>) { self.0.lock().await.push("custom event".to_string()) } + async fn on_room_notification(&self, _: Room, _: Notification) { + self.0.lock().await.push("notification".to_string()) + } } use crate::{identifiers::user_id, Client, Session, SyncSettings}; @@ -674,7 +692,7 @@ mod test { async fn get_client() -> Client { let session = Session { access_token: "1234".to_owned(), - user_id: user_id!("@example:example.com"), + user_id: user_id!("@example:localhost"), device_id: "DEVICEID".into(), }; let homeserver = url::Url::parse(&mockito::server_url()).unwrap(); @@ -683,7 +701,7 @@ mod test { client } - async fn mock_sync(client: Client, response: String) { + async fn mock_sync(client: &Client, response: String) { let _m = mock( "GET", Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), @@ -705,7 +723,7 @@ mod test { let client = get_client().await; client.set_event_handler(handler).await; - mock_sync(client, test_json::SYNC.to_string()).await; + mock_sync(&client, test_json::SYNC.to_string()).await; let v = test_vec.lock().await; assert_eq!( @@ -723,6 +741,7 @@ mod test { "state member", "message", "presence event", + "notification", ], ) } @@ -735,7 +754,7 @@ mod test { let client = get_client().await; client.set_event_handler(handler).await; - mock_sync(client, test_json::INVITE_SYNC.to_string()).await; + mock_sync(&client, test_json::INVITE_SYNC.to_string()).await; let v = test_vec.lock().await; assert_eq!( @@ -756,7 +775,7 @@ mod test { let client = get_client().await; client.set_event_handler(handler).await; - mock_sync(client, test_json::LEAVE_SYNC.to_string()).await; + mock_sync(&client, test_json::LEAVE_SYNC.to_string()).await; let v = test_vec.lock().await; assert_eq!( @@ -772,6 +791,7 @@ mod test { "state member", "message", "presence event", + "notification", ], ) } @@ -784,7 +804,7 @@ mod test { let client = get_client().await; client.set_event_handler(handler).await; - mock_sync(client, test_json::MORE_SYNC.to_string()).await; + mock_sync(&client, test_json::MORE_SYNC.to_string()).await; let v = test_vec.lock().await; assert_eq!( @@ -795,6 +815,7 @@ mod test { "message", "message", // this is a message edit event "redaction", + "message", // this is a notice event ], ) } @@ -807,7 +828,7 @@ mod test { let client = get_client().await; client.set_event_handler(handler).await; - mock_sync(client, test_json::VOIP_SYNC.to_string()).await; + mock_sync(&client, test_json::VOIP_SYNC.to_string()).await; let v = test_vec.lock().await; assert_eq!( @@ -820,4 +841,45 @@ mod test { ], ) } + + #[async_test] + async fn event_handler_two_syncs() { + let vec = Arc::new(Mutex::new(Vec::new())); + let test_vec = Arc::clone(&vec); + let handler = Box::new(EvHandlerTest(vec)); + + let client = get_client().await; + client.set_event_handler(handler).await; + mock_sync(&client, test_json::SYNC.to_string()).await; + mock_sync(&client, test_json::MORE_SYNC.to_string()).await; + + let v = test_vec.lock().await; + assert_eq!( + v.as_slice(), + [ + "receipt event", + "account read", + "account ignore", + "state rules", + "state member", + "state aliases", + "state power", + "state canonical", + "state member", + "state member", + "message", + "presence event", + "notification", + "receipt event", + "typing event", + "message", + "message", // this is a message edit event + "redaction", + "message", // this is a notice event + "notification", + "notification", + "notification", + ], + ) + } } diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index 8247cba4..c7c93570 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -20,26 +20,9 @@ use std::{ path::{Path, PathBuf}, result::Result as StdResult, sync::Arc, + time::SystemTime, }; -use matrix_sdk_common::{ - api::r0 as api, - deserialized_responses::{ - AccountData, AmbiguityChanges, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, - MemberEvent, MembersResponse, Presence, Rooms, State, StrippedMemberEvent, SyncResponse, - Timeline, - }, - events::{ - presence::PresenceEvent, - room::member::{MemberEventContent, MembershipState}, - AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, - AnyToDeviceEvent, EventContent, StateEvent, - }, - identifiers::{RoomId, UserId}, - instant::Instant, - locks::RwLock, - Raw, -}; #[cfg(feature = "encryption")] use matrix_sdk_common::{ api::r0::keys::claim_keys::Request as KeysClaimRequest, @@ -51,6 +34,25 @@ use matrix_sdk_common::{ locks::Mutex, uuid::Uuid, }; +use matrix_sdk_common::{ + api::r0::{self as api, push::get_notifications::Notification}, + deserialized_responses::{ + AccountData, AmbiguityChanges, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom, + MemberEvent, MembersResponse, Presence, Rooms, State, StrippedMemberEvent, SyncResponse, + Timeline, + }, + events::{ + presence::PresenceEvent, + room::member::{MemberEventContent, MembershipState}, + AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, + AnyToDeviceEvent, EventContent, EventType, StateEvent, + }, + identifiers::{RoomId, UserId}, + instant::Instant, + locks::RwLock, + push::{Action, PushConditionRoomCtx, Ruleset}, + Raw, UInt, +}; #[cfg(feature = "encryption")] use matrix_sdk_crypto::{ store::{CryptoStore, CryptoStoreError}, @@ -413,20 +415,27 @@ impl BaseClient { self.sync_token.read().await.clone() } + #[allow(clippy::too_many_arguments)] async fn handle_timeline( &self, - room_id: &RoomId, + room: &Room, ruma_timeline: api::sync::sync_events::Timeline, + push_rules: &Ruleset, room_info: &mut RoomInfo, changes: &mut StateChanges, ambiguity_cache: &mut AmbiguityCache, user_ids: &mut BTreeSet, - ) -> StoreResult { + ) -> Result { + let room_id = room.room_id(); + let user_id = room.own_user_id(); let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone()); + let mut push_context = self.get_push_room_context(room, room_info, changes).await?; for event in ruma_timeline.events { match hoist_room_event_prev_content(&event) { Ok(mut e) => { + let mut raw_event = event; + #[allow(clippy::single_match)] match &mut e { AnySyncRoomEvent::State(s) => match s { @@ -475,11 +484,14 @@ impl BaseClient { encrypted, )) => { if let Some(olm) = self.olm_machine().await { - if let Ok(decrypted) = + if let Ok(raw_decrypted) = olm.decrypt_room_event(encrypted, room_id).await { - match decrypted.deserialize() { - Ok(decrypted) => e = decrypted, + match raw_decrypted.deserialize() { + Ok(decrypted) => { + e = decrypted; + raw_event = raw_decrypted; + } Err(e) => { warn!("Error deserializing a decrypted event {:?} ", e) } @@ -494,6 +506,40 @@ impl BaseClient { _ => (), } + if let Some(context) = &mut push_context { + self.update_push_room_context(context, user_id, room_info, changes) + .await; + } else { + push_context = self.get_push_room_context(room, room_info, changes).await?; + } + + if let Some(context) = &push_context { + let actions = push_rules.get_actions(&raw_event, &context).to_vec(); + + if actions.iter().any(|a| matches!(a, Action::Notify)) { + let notification = Notification::new( + actions, + raw_event, + false, + room_id.clone(), + SystemTime::now(), + ); + + match changes.notifications.get_mut(room_id) { + Some(room) => { + room.push(notification); + } + None => { + changes + .notifications + .insert(room_id.clone(), vec![notification]); + } + } + } + // TODO send and store the highlight tweak value with the event. + // Needs to associate custom data with events and to store them. + } + timeline.events.push(e); } Err(e) => { @@ -747,6 +793,11 @@ impl BaseClient { let mut changes = StateChanges::new(next_batch.clone()); let mut ambiguity_cache = AmbiguityCache::new(self.store.clone()); + self.handle_account_data(account_data.events, &mut changes) + .await; + + let push_rules = self.get_push_rules(&changes).await; + let mut new_rooms = Rooms::default(); for (room_id, new_info) in rooms.join { @@ -775,8 +826,9 @@ impl BaseClient { let timeline = self .handle_timeline( - &room_id, + &room, new_info.timeline, + &push_rules, &mut room_info, &mut changes, &mut ambiguity_cache, @@ -845,8 +897,9 @@ impl BaseClient { let timeline = self .handle_timeline( - &room_id, + &room, new_info.timeline, + &push_rules, &mut room_info, &mut changes, &mut ambiguity_cache, @@ -903,9 +956,6 @@ impl BaseClient { changes.presence = presence; - self.handle_account_data(account_data.events, &mut changes) - .await; - changes.ambiguity_maps = ambiguity_cache.cache; self.store.save_changes(&changes).await?; @@ -932,6 +982,7 @@ impl BaseClient { ambiguity_changes: AmbiguityChanges { changes: ambiguity_cache.changes, }, + notifications: changes.notifications, }; Ok(response) @@ -1340,6 +1391,130 @@ impl BaseClient { let olm = self.olm.lock().await; olm.as_ref().cloned() } + + /// Get the push rules. + /// + /// Gets the push rules from `changes` if they have been updated, otherwise get them from the + /// store. As a fallback, uses `Ruleset::server_default`. + pub async fn get_push_rules(&self, changes: &StateChanges) -> Ruleset { + if let Some(AnyBasicEvent::PushRules(event)) = + changes.account_data.get(&EventType::PushRules.to_string()) + { + event.content.global.clone() + } else if let Some(AnyBasicEvent::PushRules(event)) = self + .store + .get_account_data_event(EventType::PushRules) + .await + .unwrap() + { + event.content.global + } else { + // FIXME don't panic if the user is not logged in? + let session = self.get_session().await.unwrap(); + Ruleset::server_default(&session.user_id) + } + } + + /// Get the push context for the given room. + /// + /// Tries to get the data from `changes` or the up to date `room_info`. Loads the data from the + /// store otherwise. + /// + /// Returns `None` if some data couldn't be found. This should only happen in brand new rooms, + /// while we process its state. + pub async fn get_push_room_context( + &self, + room: &Room, + room_info: &RoomInfo, + changes: &StateChanges, + ) -> Result> { + let room_id = room.room_id(); + let user_id = room.own_user_id(); + + let member_count = room_info.active_members_count(); + + let user_display_name = if let Some(member) = changes + .members + .get(room_id) + .and_then(|members| members.get(user_id)) + { + let member = member.clone(); + member + .content + .displayname + .unwrap_or_else(|| user_id.localpart().to_owned()) + } else if let Some(member) = room.get_member(user_id).await? { + member.name().to_owned() + } else { + return Ok(None); + }; + + let room_power_levels = if let Some(AnySyncStateEvent::RoomPowerLevels(event)) = changes + .state + .get(room_id) + .and_then(|types| types.get(&EventType::RoomPowerLevels.to_string())) + .and_then(|events| events.get("")) + { + event.content.clone() + } else if let Some(AnySyncStateEvent::RoomPowerLevels(event)) = self + .store + .get_state_event(room_id, EventType::RoomPowerLevels, "") + .await? + { + event.content + } else { + return Ok(None); + }; + + Ok(Some(PushConditionRoomCtx { + room_id: room_id.clone(), + member_count: UInt::new(member_count).unwrap(), + user_display_name, + users_power_levels: room_power_levels.users, + default_power_level: room_power_levels.users_default, + notification_power_levels: room_power_levels.notifications, + })) + } + + /// Update the push context for the given room. + /// + /// Updates the context data from `changes` or `room_info`. + pub async fn update_push_room_context( + &self, + push_rules: &mut PushConditionRoomCtx, + user_id: &UserId, + room_info: &RoomInfo, + changes: &StateChanges, + ) { + let room_id = &room_info.room_id; + + push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap(); + + if let Some(member) = changes + .members + .get(room_id) + .and_then(|members| members.get(user_id)) + { + let member = member.clone(); + push_rules.user_display_name = member + .content + .displayname + .unwrap_or_else(|| user_id.localpart().to_owned()) + } + + if let Some(AnySyncStateEvent::RoomPowerLevels(event)) = changes + .state + .get(room_id) + .and_then(|types| types.get(&EventType::RoomPowerLevels.to_string())) + .and_then(|events| events.get("")) + { + let room_power_levels = event.content.clone(); + + push_rules.users_power_levels = room_power_levels.users; + push_rules.default_power_level = room_power_levels.users_default; + push_rules.notification_power_levels = room_power_levels.notifications; + } + } } #[cfg(test)] diff --git a/matrix_sdk_base/src/rooms/normal.rs b/matrix_sdk_base/src/rooms/normal.rs index 53758d31..54d06a50 100644 --- a/matrix_sdk_base/src/rooms/normal.rs +++ b/matrix_sdk_base/src/rooms/normal.rs @@ -533,4 +533,9 @@ impl RoomInfo { changed } + + /// The number of active members (invited + joined) in the room. + pub fn active_members_count(&self) -> u64 { + self.summary.joined_member_count + self.summary.invited_member_count + } } diff --git a/matrix_sdk_base/src/store/memory_store.rs b/matrix_sdk_base/src/store/memory_store.rs index f2772ca8..33ffd1a5 100644 --- a/matrix_sdk_base/src/store/memory_store.rs +++ b/matrix_sdk_base/src/store/memory_store.rs @@ -303,6 +303,13 @@ impl MemoryStore { #[allow(clippy::map_clone)] self.stripped_room_info.iter().map(|r| r.clone()).collect() } + + async fn get_account_data_event(&self, event_type: EventType) -> Result> { + Ok(self + .account_data + .get(event_type.as_ref()) + .map(|e| e.clone())) + } } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -385,4 +392,8 @@ impl StateStore for MemoryStore { .and_then(|d| d.get(display_name).map(|d| d.clone())) .unwrap_or_default()) } + + async fn get_account_data_event(&self, event_type: EventType) -> Result> { + self.get_account_data_event(event_type).await + } } diff --git a/matrix_sdk_base/src/store/mod.rs b/matrix_sdk_base/src/store/mod.rs index bab5734b..95365a1a 100644 --- a/matrix_sdk_base/src/store/mod.rs +++ b/matrix_sdk_base/src/store/mod.rs @@ -23,6 +23,7 @@ use std::path::Path; use dashmap::DashMap; use matrix_sdk_common::{ + api::r0::push::get_notifications::Notification, async_trait, events::{ presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent, @@ -185,6 +186,13 @@ pub trait StateStore: AsyncTraitDeps { room_id: &RoomId, display_name: &str, ) -> Result>; + + /// Get an event out of the account data store. + /// + /// # Arguments + /// + /// * `event_type` - The event type of the account data event. + async fn get_account_data_event(&self, event_type: EventType) -> Result>; } /// A state store wrapper for the SDK. @@ -360,6 +368,9 @@ pub struct StateChanges { pub stripped_members: BTreeMap>, /// A map of `RoomId` to `RoomInfo`. pub invited_room_info: BTreeMap, + + /// A map of `RoomId` to a vector of `Notification`s + pub notifications: BTreeMap>, } impl StateChanges { diff --git a/matrix_sdk_base/src/store/sled_store/mod.rs b/matrix_sdk_base/src/store/sled_store/mod.rs index ddded62b..3076ec15 100644 --- a/matrix_sdk_base/src/store/sled_store/mod.rs +++ b/matrix_sdk_base/src/store/sled_store/mod.rs @@ -31,7 +31,7 @@ use matrix_sdk_common::{ events::{ presence::PresenceEvent, room::member::{MemberEventContent, MembershipState}, - AnySyncStateEvent, EventContent, EventType, + AnyBasicEvent, AnySyncStateEvent, EventContent, EventType, }, identifiers::{RoomId, UserId}, }; @@ -587,6 +587,17 @@ impl SledStore { .transpose()? .unwrap_or_default()) } + + pub async fn get_account_data_event( + &self, + event_type: EventType, + ) -> Result> { + Ok(self + .account_data + .get(event_type.to_string().as_str().encode())? + .map(|m| self.deserialize_event(&m)) + .transpose()?) + } } #[async_trait] @@ -664,6 +675,10 @@ impl StateStore for SledStore { self.get_users_with_display_name(room_id, display_name) .await } + + async fn get_account_data_event(&self, event_type: EventType) -> Result> { + self.get_account_data_event(event_type).await + } } #[cfg(test)] diff --git a/matrix_sdk_common/src/deserialized_responses.rs b/matrix_sdk_common/src/deserialized_responses.rs index 275cb367..45a50380 100644 --- a/matrix_sdk_common/src/deserialized_responses.rs +++ b/matrix_sdk_common/src/deserialized_responses.rs @@ -2,8 +2,11 @@ use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, convert::TryFrom, time::SystemTime}; use super::{ - api::r0::sync::sync_events::{ - DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount, + api::r0::{ + push::get_notifications::Notification, + sync::sync_events::{ + DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount, + }, }, events::{ presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent, @@ -55,6 +58,8 @@ pub struct SyncResponse { pub device_one_time_keys_count: BTreeMap, /// Collection of ambiguioty changes that room member events trigger. pub ambiguity_changes: AmbiguityChanges, + /// New notifications per room. + pub notifications: BTreeMap>, } impl SyncResponse { diff --git a/matrix_sdk_test/src/test_json/sync.rs b/matrix_sdk_test/src/test_json/sync.rs index f945cd64..fb6dae52 100644 --- a/matrix_sdk_test/src/test_json/sync.rs +++ b/matrix_sdk_test/src/test_json/sync.rs @@ -543,7 +543,7 @@ lazy_static! { lazy_static! { pub static ref MORE_SYNC: JsonValue = json!({ "device_one_time_keys_count": {}, - "next_batch": "s526_47314_0_7_1_1_1_11444_1", + "next_batch": "s526_47314_0_7_1_1_1_11444_2", "device_lists": { "changed": [ "@example:example.org" @@ -676,7 +676,22 @@ lazy_static! { "unsigned": { "age": 85 } - } + }, + { + "content": { + "body": "This is a notice", + "format": "org.matrix.custom.html", + "formatted_body": "This is a notice", + "msgtype": "m.notice" + }, + "event_id": "$098237280074GZeOm:localhost", + "origin_server_ts": 162037280, + "sender": "@bot:localhost", + "type": "m.room.message", + "unsigned": { + "age": 25 + } + }, ], "limited": true, "prev_batch": "t392-516_47314_0_7_1_1_1_11444_1"