matrix-sdk: Get notifications locally on sync

This commit is contained in:
Kévin Commaille 2021-04-12 11:09:39 +02:00 committed by Kévin Commaille
parent ab180362c9
commit 3f2c5d22b6
No known key found for this signature in database
GPG key ID: 296D60AE1E61661C
8 changed files with 340 additions and 41 deletions

View file

@ -14,7 +14,9 @@
// limitations under the License.
use std::ops::Deref;
use matrix_sdk_common::{events::AnySyncRoomEvent, identifiers::RoomId};
use matrix_sdk_common::{
api::r0::push::get_notifications::Notification, events::AnySyncRoomEvent, identifiers::RoomId,
};
use serde_json::value::RawValue as RawJsonValue;
use crate::{
@ -118,6 +120,15 @@ impl Handler {
for event in &response.presence.events {
self.on_presence_event(event).await;
}
for (room_id, notifications) in &response.notifications {
if let Some(room) = self.get_room(&room_id) {
for notification in notifications {
self.on_room_notification(room.clone(), notification.clone())
.await;
}
}
}
}
async fn handle_timeline_event(&self, room: Room, event: &AnySyncRoomEvent) {
@ -348,6 +359,10 @@ pub trait EventHandler: Send + Sync {
/// Fires when `Client` receives a `RoomEvent::Tombstone` event.
async fn on_room_tombstone(&self, _: Room, _: &SyncStateEvent<TombstoneEventContent>) {}
/// Fires when `Client` receives room events that trigger notifications according to
/// the push rules of the user.
async fn on_room_notification(&self, _: Room, _: Notification) {}
// `RoomEvent`s from `IncomingState`
/// Fires when `Client` receives a `StateEvent::RoomMember` event.
async fn on_state_member(&self, _: Room, _: &SyncStateEvent<MemberEventContent>) {}
@ -667,6 +682,9 @@ mod test {
async fn on_custom_event(&self, _: Room, _: &CustomEvent<'_>) {
self.0.lock().await.push("custom event".to_string())
}
async fn on_room_notification(&self, _: Room, _: Notification) {
self.0.lock().await.push("notification".to_string())
}
}
use crate::{identifiers::user_id, Client, Session, SyncSettings};
@ -674,7 +692,7 @@ mod test {
async fn get_client() -> Client {
let session = Session {
access_token: "1234".to_owned(),
user_id: user_id!("@example:example.com"),
user_id: user_id!("@example:localhost"),
device_id: "DEVICEID".into(),
};
let homeserver = url::Url::parse(&mockito::server_url()).unwrap();
@ -683,7 +701,7 @@ mod test {
client
}
async fn mock_sync(client: Client, response: String) {
async fn mock_sync(client: &Client, response: String) {
let _m = mock(
"GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
@ -705,7 +723,7 @@ mod test {
let client = get_client().await;
client.set_event_handler(handler).await;
mock_sync(client, test_json::SYNC.to_string()).await;
mock_sync(&client, test_json::SYNC.to_string()).await;
let v = test_vec.lock().await;
assert_eq!(
@ -723,6 +741,7 @@ mod test {
"state member",
"message",
"presence event",
"notification",
],
)
}
@ -735,7 +754,7 @@ mod test {
let client = get_client().await;
client.set_event_handler(handler).await;
mock_sync(client, test_json::INVITE_SYNC.to_string()).await;
mock_sync(&client, test_json::INVITE_SYNC.to_string()).await;
let v = test_vec.lock().await;
assert_eq!(
@ -756,7 +775,7 @@ mod test {
let client = get_client().await;
client.set_event_handler(handler).await;
mock_sync(client, test_json::LEAVE_SYNC.to_string()).await;
mock_sync(&client, test_json::LEAVE_SYNC.to_string()).await;
let v = test_vec.lock().await;
assert_eq!(
@ -772,6 +791,7 @@ mod test {
"state member",
"message",
"presence event",
"notification",
],
)
}
@ -784,7 +804,7 @@ mod test {
let client = get_client().await;
client.set_event_handler(handler).await;
mock_sync(client, test_json::MORE_SYNC.to_string()).await;
mock_sync(&client, test_json::MORE_SYNC.to_string()).await;
let v = test_vec.lock().await;
assert_eq!(
@ -795,6 +815,7 @@ mod test {
"message",
"message", // this is a message edit event
"redaction",
"message", // this is a notice event
],
)
}
@ -807,7 +828,7 @@ mod test {
let client = get_client().await;
client.set_event_handler(handler).await;
mock_sync(client, test_json::VOIP_SYNC.to_string()).await;
mock_sync(&client, test_json::VOIP_SYNC.to_string()).await;
let v = test_vec.lock().await;
assert_eq!(
@ -820,4 +841,45 @@ mod test {
],
)
}
#[async_test]
async fn event_handler_two_syncs() {
let vec = Arc::new(Mutex::new(Vec::new()));
let test_vec = Arc::clone(&vec);
let handler = Box::new(EvHandlerTest(vec));
let client = get_client().await;
client.set_event_handler(handler).await;
mock_sync(&client, test_json::SYNC.to_string()).await;
mock_sync(&client, test_json::MORE_SYNC.to_string()).await;
let v = test_vec.lock().await;
assert_eq!(
v.as_slice(),
[
"receipt event",
"account read",
"account ignore",
"state rules",
"state member",
"state aliases",
"state power",
"state canonical",
"state member",
"state member",
"message",
"presence event",
"notification",
"receipt event",
"typing event",
"message",
"message", // this is a message edit event
"redaction",
"message", // this is a notice event
"notification",
"notification",
"notification",
],
)
}
}

View file

@ -20,26 +20,9 @@ use std::{
path::{Path, PathBuf},
result::Result as StdResult,
sync::Arc,
time::SystemTime,
};
use matrix_sdk_common::{
api::r0 as api,
deserialized_responses::{
AccountData, AmbiguityChanges, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
MemberEvent, MembersResponse, Presence, Rooms, State, StrippedMemberEvent, SyncResponse,
Timeline,
},
events::{
presence::PresenceEvent,
room::member::{MemberEventContent, MembershipState},
AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent,
AnyToDeviceEvent, EventContent, StateEvent,
},
identifiers::{RoomId, UserId},
instant::Instant,
locks::RwLock,
Raw,
};
#[cfg(feature = "encryption")]
use matrix_sdk_common::{
api::r0::keys::claim_keys::Request as KeysClaimRequest,
@ -51,6 +34,25 @@ use matrix_sdk_common::{
locks::Mutex,
uuid::Uuid,
};
use matrix_sdk_common::{
api::r0::{self as api, push::get_notifications::Notification},
deserialized_responses::{
AccountData, AmbiguityChanges, Ephemeral, InviteState, InvitedRoom, JoinedRoom, LeftRoom,
MemberEvent, MembersResponse, Presence, Rooms, State, StrippedMemberEvent, SyncResponse,
Timeline,
},
events::{
presence::PresenceEvent,
room::member::{MemberEventContent, MembershipState},
AnyBasicEvent, AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent,
AnyToDeviceEvent, EventContent, EventType, StateEvent,
},
identifiers::{RoomId, UserId},
instant::Instant,
locks::RwLock,
push::{Action, PushConditionRoomCtx, Ruleset},
Raw, UInt,
};
#[cfg(feature = "encryption")]
use matrix_sdk_crypto::{
store::{CryptoStore, CryptoStoreError},
@ -413,20 +415,27 @@ impl BaseClient {
self.sync_token.read().await.clone()
}
#[allow(clippy::too_many_arguments)]
async fn handle_timeline(
&self,
room_id: &RoomId,
room: &Room,
ruma_timeline: api::sync::sync_events::Timeline,
push_rules: &Ruleset,
room_info: &mut RoomInfo,
changes: &mut StateChanges,
ambiguity_cache: &mut AmbiguityCache,
user_ids: &mut BTreeSet<UserId>,
) -> StoreResult<Timeline> {
) -> Result<Timeline> {
let room_id = room.room_id();
let user_id = room.own_user_id();
let mut timeline = Timeline::new(ruma_timeline.limited, ruma_timeline.prev_batch.clone());
let mut push_context = self.get_push_room_context(room, room_info, changes).await?;
for event in ruma_timeline.events {
match hoist_room_event_prev_content(&event) {
Ok(mut e) => {
let mut raw_event = event;
#[allow(clippy::single_match)]
match &mut e {
AnySyncRoomEvent::State(s) => match s {
@ -475,11 +484,14 @@ impl BaseClient {
encrypted,
)) => {
if let Some(olm) = self.olm_machine().await {
if let Ok(decrypted) =
if let Ok(raw_decrypted) =
olm.decrypt_room_event(encrypted, room_id).await
{
match decrypted.deserialize() {
Ok(decrypted) => e = decrypted,
match raw_decrypted.deserialize() {
Ok(decrypted) => {
e = decrypted;
raw_event = raw_decrypted;
}
Err(e) => {
warn!("Error deserializing a decrypted event {:?} ", e)
}
@ -494,6 +506,40 @@ impl BaseClient {
_ => (),
}
if let Some(context) = &mut push_context {
self.update_push_room_context(context, user_id, room_info, changes)
.await;
} else {
push_context = self.get_push_room_context(room, room_info, changes).await?;
}
if let Some(context) = &push_context {
let actions = push_rules.get_actions(&raw_event, &context).to_vec();
if actions.iter().any(|a| matches!(a, Action::Notify)) {
let notification = Notification::new(
actions,
raw_event,
false,
room_id.clone(),
SystemTime::now(),
);
match changes.notifications.get_mut(room_id) {
Some(room) => {
room.push(notification);
}
None => {
changes
.notifications
.insert(room_id.clone(), vec![notification]);
}
}
}
// TODO send and store the highlight tweak value with the event.
// Needs to associate custom data with events and to store them.
}
timeline.events.push(e);
}
Err(e) => {
@ -747,6 +793,11 @@ impl BaseClient {
let mut changes = StateChanges::new(next_batch.clone());
let mut ambiguity_cache = AmbiguityCache::new(self.store.clone());
self.handle_account_data(account_data.events, &mut changes)
.await;
let push_rules = self.get_push_rules(&changes).await;
let mut new_rooms = Rooms::default();
for (room_id, new_info) in rooms.join {
@ -775,8 +826,9 @@ impl BaseClient {
let timeline = self
.handle_timeline(
&room_id,
&room,
new_info.timeline,
&push_rules,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
@ -845,8 +897,9 @@ impl BaseClient {
let timeline = self
.handle_timeline(
&room_id,
&room,
new_info.timeline,
&push_rules,
&mut room_info,
&mut changes,
&mut ambiguity_cache,
@ -903,9 +956,6 @@ impl BaseClient {
changes.presence = presence;
self.handle_account_data(account_data.events, &mut changes)
.await;
changes.ambiguity_maps = ambiguity_cache.cache;
self.store.save_changes(&changes).await?;
@ -932,6 +982,7 @@ impl BaseClient {
ambiguity_changes: AmbiguityChanges {
changes: ambiguity_cache.changes,
},
notifications: changes.notifications,
};
Ok(response)
@ -1340,6 +1391,130 @@ impl BaseClient {
let olm = self.olm.lock().await;
olm.as_ref().cloned()
}
/// Get the push rules.
///
/// Gets the push rules from `changes` if they have been updated, otherwise get them from the
/// store. As a fallback, uses `Ruleset::server_default`.
pub async fn get_push_rules(&self, changes: &StateChanges) -> Ruleset {
if let Some(AnyBasicEvent::PushRules(event)) =
changes.account_data.get(&EventType::PushRules.to_string())
{
event.content.global.clone()
} else if let Some(AnyBasicEvent::PushRules(event)) = self
.store
.get_account_data_event(EventType::PushRules)
.await
.unwrap()
{
event.content.global
} else {
// FIXME don't panic if the user is not logged in?
let session = self.get_session().await.unwrap();
Ruleset::server_default(&session.user_id)
}
}
/// Get the push context for the given room.
///
/// Tries to get the data from `changes` or the up to date `room_info`. Loads the data from the
/// store otherwise.
///
/// Returns `None` if some data couldn't be found. This should only happen in brand new rooms,
/// while we process its state.
pub async fn get_push_room_context(
&self,
room: &Room,
room_info: &RoomInfo,
changes: &StateChanges,
) -> Result<Option<PushConditionRoomCtx>> {
let room_id = room.room_id();
let user_id = room.own_user_id();
let member_count = room_info.active_members_count();
let user_display_name = if let Some(member) = changes
.members
.get(room_id)
.and_then(|members| members.get(user_id))
{
let member = member.clone();
member
.content
.displayname
.unwrap_or_else(|| user_id.localpart().to_owned())
} else if let Some(member) = room.get_member(user_id).await? {
member.name().to_owned()
} else {
return Ok(None);
};
let room_power_levels = if let Some(AnySyncStateEvent::RoomPowerLevels(event)) = changes
.state
.get(room_id)
.and_then(|types| types.get(&EventType::RoomPowerLevels.to_string()))
.and_then(|events| events.get(""))
{
event.content.clone()
} else if let Some(AnySyncStateEvent::RoomPowerLevels(event)) = self
.store
.get_state_event(room_id, EventType::RoomPowerLevels, "")
.await?
{
event.content
} else {
return Ok(None);
};
Ok(Some(PushConditionRoomCtx {
room_id: room_id.clone(),
member_count: UInt::new(member_count).unwrap(),
user_display_name,
users_power_levels: room_power_levels.users,
default_power_level: room_power_levels.users_default,
notification_power_levels: room_power_levels.notifications,
}))
}
/// Update the push context for the given room.
///
/// Updates the context data from `changes` or `room_info`.
pub async fn update_push_room_context(
&self,
push_rules: &mut PushConditionRoomCtx,
user_id: &UserId,
room_info: &RoomInfo,
changes: &StateChanges,
) {
let room_id = &room_info.room_id;
push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap();
if let Some(member) = changes
.members
.get(room_id)
.and_then(|members| members.get(user_id))
{
let member = member.clone();
push_rules.user_display_name = member
.content
.displayname
.unwrap_or_else(|| user_id.localpart().to_owned())
}
if let Some(AnySyncStateEvent::RoomPowerLevels(event)) = changes
.state
.get(room_id)
.and_then(|types| types.get(&EventType::RoomPowerLevels.to_string()))
.and_then(|events| events.get(""))
{
let room_power_levels = event.content.clone();
push_rules.users_power_levels = room_power_levels.users;
push_rules.default_power_level = room_power_levels.users_default;
push_rules.notification_power_levels = room_power_levels.notifications;
}
}
}
#[cfg(test)]

View file

@ -533,4 +533,9 @@ impl RoomInfo {
changed
}
/// The number of active members (invited + joined) in the room.
pub fn active_members_count(&self) -> u64 {
self.summary.joined_member_count + self.summary.invited_member_count
}
}

View file

@ -303,6 +303,13 @@ impl MemoryStore {
#[allow(clippy::map_clone)]
self.stripped_room_info.iter().map(|r| r.clone()).collect()
}
async fn get_account_data_event(&self, event_type: EventType) -> Result<Option<AnyBasicEvent>> {
Ok(self
.account_data
.get(event_type.as_ref())
.map(|e| e.clone()))
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@ -385,4 +392,8 @@ impl StateStore for MemoryStore {
.and_then(|d| d.get(display_name).map(|d| d.clone()))
.unwrap_or_default())
}
async fn get_account_data_event(&self, event_type: EventType) -> Result<Option<AnyBasicEvent>> {
self.get_account_data_event(event_type).await
}
}

View file

@ -23,6 +23,7 @@ use std::path::Path;
use dashmap::DashMap;
use matrix_sdk_common::{
api::r0::push::get_notifications::Notification,
async_trait,
events::{
presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent,
@ -185,6 +186,13 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
display_name: &str,
) -> Result<BTreeSet<UserId>>;
/// Get an event out of the account data store.
///
/// # Arguments
///
/// * `event_type` - The event type of the account data event.
async fn get_account_data_event(&self, event_type: EventType) -> Result<Option<AnyBasicEvent>>;
}
/// A state store wrapper for the SDK.
@ -360,6 +368,9 @@ pub struct StateChanges {
pub stripped_members: BTreeMap<RoomId, BTreeMap<UserId, StrippedMemberEvent>>,
/// A map of `RoomId` to `RoomInfo`.
pub invited_room_info: BTreeMap<RoomId, RoomInfo>,
/// A map of `RoomId` to a vector of `Notification`s
pub notifications: BTreeMap<RoomId, Vec<Notification>>,
}
impl StateChanges {

View file

@ -31,7 +31,7 @@ use matrix_sdk_common::{
events::{
presence::PresenceEvent,
room::member::{MemberEventContent, MembershipState},
AnySyncStateEvent, EventContent, EventType,
AnyBasicEvent, AnySyncStateEvent, EventContent, EventType,
},
identifiers::{RoomId, UserId},
};
@ -587,6 +587,17 @@ impl SledStore {
.transpose()?
.unwrap_or_default())
}
pub async fn get_account_data_event(
&self,
event_type: EventType,
) -> Result<Option<AnyBasicEvent>> {
Ok(self
.account_data
.get(event_type.to_string().as_str().encode())?
.map(|m| self.deserialize_event(&m))
.transpose()?)
}
}
#[async_trait]
@ -664,6 +675,10 @@ impl StateStore for SledStore {
self.get_users_with_display_name(room_id, display_name)
.await
}
async fn get_account_data_event(&self, event_type: EventType) -> Result<Option<AnyBasicEvent>> {
self.get_account_data_event(event_type).await
}
}
#[cfg(test)]

View file

@ -2,8 +2,11 @@ use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, convert::TryFrom, time::SystemTime};
use super::{
api::r0::sync::sync_events::{
DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount,
api::r0::{
push::get_notifications::Notification,
sync::sync_events::{
DeviceLists, UnreadNotificationsCount as RumaUnreadNotificationsCount,
},
},
events::{
presence::PresenceEvent, room::member::MemberEventContent, AnyBasicEvent,
@ -55,6 +58,8 @@ pub struct SyncResponse {
pub device_one_time_keys_count: BTreeMap<DeviceKeyAlgorithm, u64>,
/// Collection of ambiguioty changes that room member events trigger.
pub ambiguity_changes: AmbiguityChanges,
/// New notifications per room.
pub notifications: BTreeMap<RoomId, Vec<Notification>>,
}
impl SyncResponse {

View file

@ -543,7 +543,7 @@ lazy_static! {
lazy_static! {
pub static ref MORE_SYNC: JsonValue = json!({
"device_one_time_keys_count": {},
"next_batch": "s526_47314_0_7_1_1_1_11444_1",
"next_batch": "s526_47314_0_7_1_1_1_11444_2",
"device_lists": {
"changed": [
"@example:example.org"
@ -676,7 +676,22 @@ lazy_static! {
"unsigned": {
"age": 85
}
}
},
{
"content": {
"body": "This is a notice",
"format": "org.matrix.custom.html",
"formatted_body": "<em>This is a notice</em>",
"msgtype": "m.notice"
},
"event_id": "$098237280074GZeOm:localhost",
"origin_server_ts": 162037280,
"sender": "@bot:localhost",
"type": "m.room.message",
"unsigned": {
"age": 25
}
},
],
"limited": true,
"prev_batch": "t392-516_47314_0_7_1_1_1_11444_1"