From 662a0cf1df7c33ec1f3c00b6f0a722cdbc03a3d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 12 Apr 2021 12:40:16 +0200 Subject: [PATCH] improvement: better and more efficient message count calculation --- src/client_server/membership.rs | 5 +- src/client_server/read_marker.rs | 4 ++ src/client_server/sync.rs | 38 ++++++------ src/database.rs | 3 + src/database/pusher.rs | 54 +++++++++++------ src/database/rooms.rs | 101 +++++++++++++++++++++++++++++-- src/database/sending.rs | 32 +++------- 7 files changed, 169 insertions(+), 68 deletions(-) diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 149df93..e816005 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -596,7 +596,10 @@ async fn join_room_by_id_helper( if let Some(state_key) = &pdu.state_key { if pdu.kind == EventType::RoomMember { let target_user_id = UserId::try_from(state_key.clone()).map_err(|e| { - warn!("Invalid user id in send_join response: {}: {}", state_key, e); + warn!( + "Invalid user id in send_join response: {}: {}", + state_key, e + ); Error::BadServerResponse("Invalid user id in send_join response.") })?; diff --git a/src/client_server/read_marker.rs b/src/client_server/read_marker.rs index 555b7e7..166e59a 100644 --- a/src/client_server/read_marker.rs +++ b/src/client_server/read_marker.rs @@ -47,6 +47,8 @@ pub async fn set_read_marker_route( ))?, &db.globals, )?; + db.rooms + .reset_notification_counts(&sender_user, &body.room_id)?; let mut user_receipts = BTreeMap::new(); user_receipts.insert( @@ -103,6 +105,8 @@ pub async fn create_receipt_route( ))?, &db.globals, )?; + db.rooms + .reset_notification_counts(&sender_user, &body.room_id)?; let mut user_receipts = BTreeMap::new(); user_receipts.insert( diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index f1ad9a5..fe14208 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -12,7 +12,7 @@ use ruma::{ use rocket::{get, tokio}; use std::{ collections::{hash_map, BTreeMap, HashMap, HashSet}, - convert::TryFrom, + convert::{TryFrom, TryInto}, time::Duration, }; @@ -370,23 +370,23 @@ pub async fn sync_events_route( ); let notification_count = if send_notification_counts { - if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_user)? { - Some( - (db.rooms - .pdus_since(&sender_user, &room_id, last_read)? - .filter_map(|pdu| pdu.ok()) // Filter out buggy events - .filter(|(_, pdu)| { - matches!( - pdu.kind.clone(), - EventType::RoomMessage | EventType::RoomEncrypted - ) - }) - .count() as u32) - .into(), - ) - } else { - None - } + Some( + db.rooms + .notification_count(&sender_user, &room_id)? + .try_into() + .expect("notification count can't go that high"), + ) + } else { + None + }; + + let highlight_count = if send_notification_counts { + Some( + db.rooms + .highlight_count(&sender_user, &room_id)? + .try_into() + .expect("highlight count can't go that high"), + ) } else { None }; @@ -440,7 +440,7 @@ pub async fn sync_events_route( invited_member_count: invited_member_count.map(|n| (n as u32).into()), }, unread_notifications: sync_events::UnreadNotificationsCount { - highlight_count: None, + highlight_count, notification_count, }, timeline: sync_events::Timeline { diff --git a/src/database.rs b/src/database.rs index 211c3f4..9d629dd 100644 --- a/src/database.rs +++ b/src/database.rs @@ -165,6 +165,9 @@ impl Database { roomuserid_invitecount: db.open_tree("roomuserid_invitecount")?, userroomid_left: db.open_tree("userroomid_left")?, + userroomid_notificationcount: db.open_tree("userroomid_notificationcount")?, + userroomid_highlightcount: db.open_tree("userroomid_highlightcount")?, + statekey_shortstatekey: db.open_tree("statekey_shortstatekey")?, stateid_shorteventid: db.open_tree("stateid_shorteventid")?, eventid_shorteventid: db.open_tree("eventid_shorteventid")?, diff --git a/src/database/pusher.rs b/src/database/pusher.rs index f4c02d0..e2bd3f1 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -174,29 +174,10 @@ pub async fn send_push_notice( pdu: &PduEvent, db: &Database, ) -> Result<()> { - let power_levels: PowerLevelsEventContent = db - .rooms - .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? - .map(|ev| { - serde_json::from_value(ev.content) - .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) - }) - .transpose()? - .unwrap_or_default(); - - let ctx = PushConditionRoomCtx { - room_id: pdu.room_id.clone(), - member_count: (db.rooms.room_members(&pdu.room_id).count() as u32).into(), - user_display_name: user.localpart().into(), // TODO: Use actual display name - users_power_levels: power_levels.users, - default_power_level: power_levels.users_default, - notification_power_levels: power_levels.notifications, - }; - let mut notify = None; let mut tweaks = Vec::new(); - for action in ruleset.get_actions(&pdu.to_sync_room_event(), &ctx) { + for action in get_actions(user, &ruleset, pdu, db)? { let n = match action { Action::DontNotify => false, // TODO: Implement proper support for coalesce @@ -224,6 +205,39 @@ pub async fn send_push_notice( Ok(()) } +pub fn get_actions<'a>( + user: &UserId, + ruleset: &'a Ruleset, + pdu: &PduEvent, + db: &Database, +) -> Result> { + let power_levels: PowerLevelsEventContent = db + .rooms + .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? + .map(|ev| { + serde_json::from_value(ev.content) + .map_err(|_| Error::bad_database("invalid m.room.power_levels event")) + }) + .transpose()? + .unwrap_or_default(); + + let ctx = PushConditionRoomCtx { + room_id: pdu.room_id.clone(), + member_count: (db.rooms.room_members(&pdu.room_id).count() as u32).into(), + user_display_name: db + .users + .displayname(&user)? + .unwrap_or(user.localpart().to_owned()), + users_power_levels: power_levels.users, + default_power_level: power_levels.users_default, + notification_power_levels: power_levels.notifications, + }; + + Ok(ruleset + .get_actions(&pdu.to_sync_room_event(), &ctx) + .map(Clone::clone)) +} + async fn send_notice( unread: UInt, pusher: &Pusher, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 09944cf..3f37de6 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -9,10 +9,11 @@ use ring::digest; use ruma::{ api::client::error::ErrorKind, events::{ - ignored_user_list, + ignored_user_list, push_rules, room::{create::CreateEventContent, member, message}, AnyStrippedStateEvent, EventType, }, + push::{self, Action, Tweak}, serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw}, uint, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; @@ -26,7 +27,7 @@ use std::{ sync::Arc, }; -use super::admin::AdminCommand; +use super::{admin::AdminCommand, pusher}; /// The unique identifier of each state group. /// @@ -51,10 +52,13 @@ pub struct Rooms { pub(super) userroomid_joined: sled::Tree, pub(super) roomuserid_joined: sled::Tree, pub(super) roomuseroncejoinedids: sled::Tree, - pub(super) userroomid_invitestate: sled::Tree, - pub(super) roomuserid_invitecount: sled::Tree, + pub(super) userroomid_invitestate: sled::Tree, // InviteState = Vec> + pub(super) roomuserid_invitecount: sled::Tree, // InviteCount = Count pub(super) userroomid_left: sled::Tree, + pub(super) userroomid_notificationcount: sled::Tree, // NotifyCount = u64 + pub(super) userroomid_highlightcount: sled::Tree, // HightlightCount = u64 + /// Remember the current state hash of a room. pub(super) roomid_shortstatehash: sled::Tree, /// Remember the state hash at events in the past. @@ -649,6 +653,7 @@ impl Rooms { // fails self.edus .private_read_set(&pdu.room_id, &pdu.sender, count, &db.globals)?; + self.reset_notification_counts(&pdu.sender, &pdu.room_id)?; self.pduid_pdu.insert( &pdu_id, @@ -673,6 +678,45 @@ impl Rooms { continue; } + let rules_for_user = db + .account_data + .get::(None, &user, EventType::PushRules)? + .map(|ev| ev.content.global) + .unwrap_or_else(|| push::Ruleset::server_default(&user)); + + let mut highlight = false; + let mut notify = false; + + for action in pusher::get_actions(&user, &rules_for_user, pdu, db)? { + match action { + Action::DontNotify => notify = false, + // TODO: Implement proper support for coalesce + Action::Notify | Action::Coalesce => notify = true, + Action::SetTweak(Tweak::Highlight(true)) => { + highlight = true; + } + _ => {} + }; + } + + let mut userroom_id = user.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(pdu.room_id.as_bytes()); + + if notify { + &self + .userroomid_notificationcount + .update_and_fetch(&userroom_id, utils::increment)? + .expect("utils::increment will always put in a value"); + } + + if highlight { + &self + .userroomid_highlightcount + .update_and_fetch(&userroom_id, utils::increment)? + .expect("utils::increment will always put in a value"); + } + for senderkey in db .pusher .get_pusher_senderkeys(&user) @@ -738,6 +782,14 @@ impl Rooms { { state.push(e.to_stripped_state_event()); } + if let Some(e) = + self.room_state_get(&pdu.room_id, &EventType::RoomMember, pdu.sender.as_str())? + { + state.push(e.to_stripped_state_event()); + } + + state.push(pdu.to_stripped_state_event()); + Some(state) } _ => None, @@ -844,6 +896,47 @@ impl Rooms { Ok(()) } + pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + self.userroomid_notificationcount + .insert(&userroom_id, &0_u64.to_be_bytes())?; + self.userroomid_highlightcount + .insert(&userroom_id, &0_u64.to_be_bytes())?; + + Ok(()) + } + + pub fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + self.userroomid_notificationcount + .get(&userroom_id)? + .map(|bytes| { + utils::u64_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid notification count in db.")) + }) + .unwrap_or(Ok(0)) + } + + pub fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut userroom_id = user_id.as_bytes().to_vec(); + userroom_id.push(0xff); + userroom_id.extend_from_slice(room_id.as_bytes()); + + self.userroomid_highlightcount + .get(&userroom_id)? + .map(|bytes| { + utils::u64_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid highlight count in db.")) + }) + .unwrap_or(Ok(0)) + } + /// Generates a new StateHash and associates it with the incoming event. /// /// This adds all current state events (not including the incoming event) diff --git a/src/database/sending.rs b/src/database/sending.rs index d6dcead..ffd3ed6 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -1,6 +1,6 @@ use std::{ collections::HashMap, - convert::TryFrom, + convert::{TryFrom, TryInto}, fmt::Debug, sync::Arc, time::{Duration, Instant, SystemTime}, @@ -16,7 +16,7 @@ use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ api::{appservice, federation, OutgoingRequest}, events::{push_rules, EventType}, - push, uint, ServerName, UInt, UserId, + push, ServerName, UInt, UserId, }; use sled::IVec; use tokio::{select, sync::Semaphore}; @@ -432,32 +432,16 @@ impl Sending { let rules_for_user = db .account_data .get::(None, &userid, EventType::PushRules) - .map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))? + .unwrap_or_default() .map(|ev| ev.content.global) .unwrap_or_else(|| push::Ruleset::server_default(&userid)); - let unread: UInt = if let Some(last_read) = db + let unread: UInt = db .rooms - .edus - .private_read_get(&pdu.room_id, &userid) - .map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))? - { - (db.rooms - .pdus_since(&userid, &pdu.room_id, last_read) - .map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))? - .filter_map(|pdu| pdu.ok()) // Filter out buggy events - .filter(|(_, pdu)| { - matches!( - pdu.kind.clone(), - EventType::RoomMessage | EventType::RoomEncrypted - ) - }) - .count() as u32) - .into() - } else { - // Just return zero unread messages - uint!(0) - }; + .notification_count(&userid, &pdu.room_id) + .map_err(|e| (kind.clone(), e))? + .try_into() + .expect("notifiation count can't go that high"); let permit = db.sending.maximum_requests.acquire().await;