fix: send notification count updates when private read receipts change
This commit is contained in:
		
							parent
							
								
									9aa5e99a0f
								
							
						
					
					
						commit
						33215d6099
					
				
					 6 changed files with 107 additions and 85 deletions
				
			
		|  | @ -34,13 +34,14 @@ pub fn set_read_marker_route( | |||
|     )?; | ||||
| 
 | ||||
|     if let Some(event) = &body.read_receipt { | ||||
|         db.rooms.edus.room_read_set( | ||||
|         db.rooms.edus.private_read_set( | ||||
|             &body.room_id, | ||||
|             &sender_id, | ||||
|             db.rooms.get_pdu_count(event)?.ok_or(Error::BadRequest( | ||||
|                 ErrorKind::InvalidParam, | ||||
|                 "Event does not exist.", | ||||
|             ))?, | ||||
|             &db.globals, | ||||
|         )?; | ||||
| 
 | ||||
|         let mut user_receipts = BTreeMap::new(); | ||||
|  | @ -58,7 +59,7 @@ pub fn set_read_marker_route( | |||
|             }, | ||||
|         ); | ||||
| 
 | ||||
|         db.rooms.edus.roomlatest_update( | ||||
|         db.rooms.edus.readreceipt_update( | ||||
|             &sender_id, | ||||
|             &body.room_id, | ||||
|             AnyEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt( | ||||
|  |  | |||
|  | @ -81,7 +81,12 @@ pub async fn sync_events_route( | |||
|             .rev() | ||||
|             .collect::<Vec<_>>(); | ||||
| 
 | ||||
|         let send_notification_counts = !timeline_pdus.is_empty(); | ||||
|         let send_notification_counts = !timeline_pdus.is_empty() | ||||
|             || db | ||||
|                 .rooms | ||||
|                 .edus | ||||
|                 .last_privateread_update(&sender_id, &room_id)? | ||||
|                 > since; | ||||
| 
 | ||||
|         // They /sync response doesn't always return all messages, so we say the output is
 | ||||
|         // limited unless there are events in non_timeline_pdus
 | ||||
|  | @ -242,7 +247,7 @@ pub async fn sync_events_route( | |||
|         }; | ||||
| 
 | ||||
|         let notification_count = if send_notification_counts { | ||||
|             if let Some(last_read) = db.rooms.edus.room_read_get(&room_id, &sender_id)? { | ||||
|             if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_id)? { | ||||
|                 Some( | ||||
|                     (db.rooms | ||||
|                         .pdus_since(&sender_id, &room_id, last_read)? | ||||
|  | @ -280,20 +285,15 @@ pub async fn sync_events_route( | |||
|         let mut edus = db | ||||
|             .rooms | ||||
|             .edus | ||||
|             .roomlatests_since(&room_id, since)? | ||||
|             .readreceipts_since(&room_id, since)? | ||||
|             .filter_map(|r| r.ok()) // Filter out buggy events
 | ||||
|             .collect::<Vec<_>>(); | ||||
| 
 | ||||
|         if db | ||||
|             .rooms | ||||
|             .edus | ||||
|             .last_roomactive_update(&room_id, &db.globals)? | ||||
|             > since | ||||
|         { | ||||
|         if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since { | ||||
|             edus.push( | ||||
|                 serde_json::from_str( | ||||
|                     &serde_json::to_string(&AnySyncEphemeralRoomEvent::Typing( | ||||
|                         db.rooms.edus.roomactives_all(&room_id)?, | ||||
|                         db.rooms.edus.typings_all(&room_id)?, | ||||
|                     )) | ||||
|                     .expect("event is valid, we just created it"), | ||||
|                 ) | ||||
|  |  | |||
|  | @ -16,7 +16,7 @@ pub fn create_typing_event_route( | |||
|     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); | ||||
| 
 | ||||
|     if body.typing { | ||||
|         db.rooms.edus.roomactive_add( | ||||
|         db.rooms.edus.typing_add( | ||||
|             &sender_id, | ||||
|             &body.room_id, | ||||
|             body.timeout.map(|d| d.as_millis() as u64).unwrap_or(30000) | ||||
|  | @ -26,7 +26,7 @@ pub fn create_typing_event_route( | |||
|     } else { | ||||
|         db.rooms | ||||
|             .edus | ||||
|             .roomactive_remove(&sender_id, &body.room_id, &db.globals)?; | ||||
|             .typing_remove(&sender_id, &body.room_id, &db.globals)?; | ||||
|     } | ||||
| 
 | ||||
|     Ok(create_typing_event::Response.into()) | ||||
|  |  | |||
|  | @ -88,10 +88,11 @@ impl Database { | |||
|             }, | ||||
|             rooms: rooms::Rooms { | ||||
|                 edus: rooms::RoomEdus { | ||||
|                     roomuserid_lastread: db.open_tree("roomuserid_lastread")?, // "Private" read receipt
 | ||||
|                     roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest")?, // Read receipts
 | ||||
|                     roomactiveid_userid: db.open_tree("roomactiveid_userid")?, // Typing notifs
 | ||||
|                     roomid_lastroomactiveupdate: db.open_tree("roomid_lastroomactiveupdate")?, | ||||
|                     readreceiptid_readreceipt: db.open_tree("readreceiptid_readreceipt")?, | ||||
|                     roomuserid_privateread: db.open_tree("roomuserid_privateread")?, // "Private" read receipt
 | ||||
|                     roomuserid_lastprivatereadupdate: db.open_tree("roomid_lastprivatereadupdate")?, | ||||
|                     typingid_userid: db.open_tree("typingid_userid")?, | ||||
|                     roomid_lasttypingupdate: db.open_tree("roomid_lasttypingupdate")?, | ||||
|                     presenceid_presence: db.open_tree("presenceid_presence")?, | ||||
|                     userid_lastpresenceupdate: db.open_tree("userid_lastpresenceupdate")?, | ||||
|                 }, | ||||
|  | @ -163,14 +164,14 @@ impl Database { | |||
|             futures.push( | ||||
|                 self.rooms | ||||
|                     .edus | ||||
|                     .roomid_lastroomactiveupdate | ||||
|                     .roomid_lasttypingupdate | ||||
|                     .watch_prefix(&roomid_bytes), | ||||
|             ); | ||||
| 
 | ||||
|             futures.push( | ||||
|                 self.rooms | ||||
|                     .edus | ||||
|                     .roomlatestid_roomlatest | ||||
|                     .readreceiptid_readreceipt | ||||
|                     .watch_prefix(&roomid_prefix), | ||||
|             ); | ||||
| 
 | ||||
|  |  | |||
|  | @ -621,7 +621,7 @@ impl Rooms { | |||
|             } | ||||
|             _ => {} | ||||
|         } | ||||
|         self.edus.room_read_set(&room_id, &sender, index)?; | ||||
|         self.edus.private_read_set(&room_id, &sender, index, &globals)?; | ||||
| 
 | ||||
|         Ok(pdu.event_id) | ||||
|     } | ||||
|  |  | |||
|  | @ -14,17 +14,18 @@ use std::{ | |||
| }; | ||||
| 
 | ||||
| pub struct RoomEdus { | ||||
|     pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User
 | ||||
|     pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId
 | ||||
|     pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count
 | ||||
|     pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count
 | ||||
|     pub(in super::super) readreceiptid_readreceipt: sled::Tree, // ReadReceiptId = RoomId + Count + UserId
 | ||||
|     pub(in super::super) roomuserid_privateread: sled::Tree, // RoomUserId = Room + User, PrivateRead = Count
 | ||||
|     pub(in super::super) roomuserid_lastprivatereadupdate: sled::Tree, // LastPrivateReadUpdate = Count
 | ||||
|     pub(in super::super) typingid_userid: sled::Tree, // TypingId = RoomId + TimeoutTime + Count
 | ||||
|     pub(in super::super) roomid_lasttypingupdate: sled::Tree, // LastRoomTypingUpdate = Count
 | ||||
|     pub(in super::super) presenceid_presence: sled::Tree, // PresenceId = RoomId + Count + UserId
 | ||||
|     pub(in super::super) userid_lastpresenceupdate: sled::Tree, // LastPresenceUpdate = Count
 | ||||
| } | ||||
| 
 | ||||
| impl RoomEdus { | ||||
|     /// Adds an event which will be saved until a new event replaces it (e.g. read receipt).
 | ||||
|     pub fn roomlatest_update( | ||||
|     pub fn readreceipt_update( | ||||
|         &self, | ||||
|         user_id: &UserId, | ||||
|         room_id: &RoomId, | ||||
|  | @ -36,7 +37,7 @@ impl RoomEdus { | |||
| 
 | ||||
|         // Remove old entry
 | ||||
|         if let Some(old) = self | ||||
|             .roomlatestid_roomlatest | ||||
|             .readreceiptid_readreceipt | ||||
|             .scan_prefix(&prefix) | ||||
|             .keys() | ||||
|             .rev() | ||||
|  | @ -50,7 +51,7 @@ impl RoomEdus { | |||
|             }) | ||||
|         { | ||||
|             // This is the old room_latest
 | ||||
|             self.roomlatestid_roomlatest.remove(old)?; | ||||
|             self.readreceiptid_readreceipt.remove(old)?; | ||||
|         } | ||||
| 
 | ||||
|         let mut room_latest_id = prefix; | ||||
|  | @ -58,7 +59,7 @@ impl RoomEdus { | |||
|         room_latest_id.push(0xff); | ||||
|         room_latest_id.extend_from_slice(&user_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         self.roomlatestid_roomlatest.insert( | ||||
|         self.readreceiptid_readreceipt.insert( | ||||
|             room_latest_id, | ||||
|             &*serde_json::to_string(&event).expect("EduEvent::to_string always works"), | ||||
|         )?; | ||||
|  | @ -67,7 +68,7 @@ impl RoomEdus { | |||
|     } | ||||
| 
 | ||||
|     /// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
 | ||||
|     pub fn roomlatests_since( | ||||
|     pub fn readreceipts_since( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|         since: u64, | ||||
|  | @ -79,7 +80,7 @@ impl RoomEdus { | |||
|         first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
 | ||||
| 
 | ||||
|         Ok(self | ||||
|             .roomlatestid_roomlatest | ||||
|             .readreceiptid_readreceipt | ||||
|             .range(&*first_possible_edu..) | ||||
|             .filter_map(|r| r.ok()) | ||||
|             .take_while(move |(k, _)| k.starts_with(&prefix)) | ||||
|  | @ -90,9 +91,54 @@ impl RoomEdus { | |||
|             })) | ||||
|     } | ||||
| 
 | ||||
|     /// Sets a user as typing until the timeout timestamp is reached or roomactive_remove is
 | ||||
|     /// Sets a private read marker at `count`.
 | ||||
|     pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64, globals: &super::super::globals::Globals) -> Result<()> { | ||||
|         let mut key = room_id.to_string().as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(&user_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         self.roomuserid_privateread | ||||
|             .insert(&key, &count.to_be_bytes())?; | ||||
| 
 | ||||
|         self.roomuserid_lastprivatereadupdate | ||||
|             .insert(&key, &globals.next_count()?.to_be_bytes())?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Returns the private read marker.
 | ||||
|     pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> { | ||||
|         let mut key = room_id.to_string().as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(&user_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         self.roomuserid_privateread.get(key)?.map_or(Ok(None), |v| { | ||||
|             Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { | ||||
|                 Error::bad_database("Invalid private read marker bytes") | ||||
|             })?)) | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     /// Returns the count of the last typing update in this room.
 | ||||
|     pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> { | ||||
|         let mut key = room_id.to_string().as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(&user_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         Ok(self | ||||
|             .roomuserid_lastprivatereadupdate | ||||
|             .get(&key)? | ||||
|             .map_or(Ok::<_, Error>(None), |bytes| { | ||||
|                 Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { | ||||
|                     Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.") | ||||
|                 })?)) | ||||
|             })? | ||||
|             .unwrap_or(0)) | ||||
|     } | ||||
| 
 | ||||
|     /// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
 | ||||
|     /// called.
 | ||||
|     pub fn roomactive_add( | ||||
|     pub fn typing_add( | ||||
|         &self, | ||||
|         user_id: &UserId, | ||||
|         room_id: &RoomId, | ||||
|  | @ -104,22 +150,22 @@ impl RoomEdus { | |||
| 
 | ||||
|         let count = globals.next_count()?.to_be_bytes(); | ||||
| 
 | ||||
|         let mut room_active_id = prefix; | ||||
|         room_active_id.extend_from_slice(&timeout.to_be_bytes()); | ||||
|         room_active_id.push(0xff); | ||||
|         room_active_id.extend_from_slice(&count); | ||||
|         let mut room_typing_id = prefix; | ||||
|         room_typing_id.extend_from_slice(&timeout.to_be_bytes()); | ||||
|         room_typing_id.push(0xff); | ||||
|         room_typing_id.extend_from_slice(&count); | ||||
| 
 | ||||
|         self.roomactiveid_userid | ||||
|             .insert(&room_active_id, &*user_id.to_string().as_bytes())?; | ||||
|         self.typingid_userid | ||||
|             .insert(&room_typing_id, &*user_id.to_string().as_bytes())?; | ||||
| 
 | ||||
|         self.roomid_lastroomactiveupdate | ||||
|         self.roomid_lasttypingupdate | ||||
|             .insert(&room_id.to_string().as_bytes(), &count)?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Removes a user from typing before the timeout is reached.
 | ||||
|     pub fn roomactive_remove( | ||||
|     pub fn typing_remove( | ||||
|         &self, | ||||
|         user_id: &UserId, | ||||
|         room_id: &RoomId, | ||||
|  | @ -132,19 +178,19 @@ impl RoomEdus { | |||
| 
 | ||||
|         let mut found_outdated = false; | ||||
| 
 | ||||
|         // Maybe there are multiple ones from calling roomactive_add multiple times
 | ||||
|         // Maybe there are multiple ones from calling roomtyping_add multiple times
 | ||||
|         for outdated_edu in self | ||||
|             .roomactiveid_userid | ||||
|             .typingid_userid | ||||
|             .scan_prefix(&prefix) | ||||
|             .filter_map(|r| r.ok()) | ||||
|             .filter(|(_, v)| v == user_id.as_bytes()) | ||||
|         { | ||||
|             self.roomactiveid_userid.remove(outdated_edu.0)?; | ||||
|             self.typingid_userid.remove(outdated_edu.0)?; | ||||
|             found_outdated = true; | ||||
|         } | ||||
| 
 | ||||
|         if found_outdated { | ||||
|             self.roomid_lastroomactiveupdate.insert( | ||||
|             self.roomid_lasttypingupdate.insert( | ||||
|                 &room_id.to_string().as_bytes(), | ||||
|                 &globals.next_count()?.to_be_bytes(), | ||||
|             )?; | ||||
|  | @ -154,7 +200,7 @@ impl RoomEdus { | |||
|     } | ||||
| 
 | ||||
|     /// Makes sure that typing events with old timestamps get removed.
 | ||||
|     fn roomactives_maintain( | ||||
|     fn typings_maintain( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|         globals: &super::super::globals::Globals, | ||||
|  | @ -168,7 +214,7 @@ impl RoomEdus { | |||
| 
 | ||||
|         // Find all outdated edus before inserting a new one
 | ||||
|         for outdated_edu in self | ||||
|             .roomactiveid_userid | ||||
|             .typingid_userid | ||||
|             .scan_prefix(&prefix) | ||||
|             .keys() | ||||
|             .map(|key| { | ||||
|  | @ -176,21 +222,21 @@ impl RoomEdus { | |||
|                 Ok::<_, Error>(( | ||||
|                     key.clone(), | ||||
|                     utils::u64_from_bytes(key.split(|&b| b == 0xff).nth(1).ok_or_else(|| { | ||||
|                         Error::bad_database("RoomActive has invalid timestamp or delimiters.") | ||||
|                         Error::bad_database("RoomTyping has invalid timestamp or delimiters.") | ||||
|                     })?) | ||||
|                     .map_err(|_| Error::bad_database("RoomActive has invalid timestamp bytes."))?, | ||||
|                     .map_err(|_| Error::bad_database("RoomTyping has invalid timestamp bytes."))?, | ||||
|                 )) | ||||
|             }) | ||||
|             .filter_map(|r| r.ok()) | ||||
|             .take_while(|&(_, timestamp)| timestamp < current_timestamp) | ||||
|         { | ||||
|             // This is an outdated edu (time > timestamp)
 | ||||
|             self.roomactiveid_userid.remove(outdated_edu.0)?; | ||||
|             self.typingid_userid.remove(outdated_edu.0)?; | ||||
|             found_outdated = true; | ||||
|         } | ||||
| 
 | ||||
|         if found_outdated { | ||||
|             self.roomid_lastroomactiveupdate.insert( | ||||
|             self.roomid_lasttypingupdate.insert( | ||||
|                 &room_id.to_string().as_bytes(), | ||||
|                 &globals.next_count()?.to_be_bytes(), | ||||
|             )?; | ||||
|  | @ -199,16 +245,16 @@ impl RoomEdus { | |||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Returns an iterator over all active events (e.g. typing notifications).
 | ||||
|     pub fn last_roomactive_update( | ||||
|     /// Returns the count of the last typing update in this room.
 | ||||
|     pub fn last_typing_update( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|         globals: &super::super::globals::Globals, | ||||
|     ) -> Result<u64> { | ||||
|         self.roomactives_maintain(room_id, globals)?; | ||||
|         self.typings_maintain(room_id, globals)?; | ||||
| 
 | ||||
|         Ok(self | ||||
|             .roomid_lastroomactiveupdate | ||||
|             .roomid_lasttypingupdate | ||||
|             .get(&room_id.to_string().as_bytes())? | ||||
|             .map_or(Ok::<_, Error>(None), |bytes| { | ||||
|                 Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { | ||||
|  | @ -218,7 +264,7 @@ impl RoomEdus { | |||
|             .unwrap_or(0)) | ||||
|     } | ||||
| 
 | ||||
|     pub fn roomactives_all( | ||||
|     pub fn typings_all( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|     ) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> { | ||||
|  | @ -228,17 +274,15 @@ impl RoomEdus { | |||
|         let mut user_ids = Vec::new(); | ||||
| 
 | ||||
|         for user_id in self | ||||
|             .roomactiveid_userid | ||||
|             .typingid_userid | ||||
|             .scan_prefix(prefix) | ||||
|             .values() | ||||
|             .map(|user_id| { | ||||
|                 Ok::<_, Error>( | ||||
|                     UserId::try_from(utils::string_from_bytes(&user_id?).map_err(|_| { | ||||
|                         Error::bad_database("User ID in roomactiveid_userid is invalid unicode.") | ||||
|                         Error::bad_database("User ID in typingid_userid is invalid unicode.") | ||||
|                     })?) | ||||
|                     .map_err(|_| { | ||||
|                         Error::bad_database("User ID in roomactiveid_userid is invalid.") | ||||
|                     })?, | ||||
|                     .map_err(|_| Error::bad_database("User ID in typingid_userid is invalid."))?, | ||||
|                 ) | ||||
|             }) | ||||
|         { | ||||
|  | @ -250,30 +294,6 @@ impl RoomEdus { | |||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     /// Sets a private read marker at `count`.
 | ||||
|     pub fn room_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> { | ||||
|         let mut key = room_id.to_string().as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(&user_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         self.roomuserid_lastread.insert(key, &count.to_be_bytes())?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Returns the private read marker.
 | ||||
|     pub fn room_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> { | ||||
|         let mut key = room_id.to_string().as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(&user_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         self.roomuserid_lastread.get(key)?.map_or(Ok(None), |v| { | ||||
|             Ok(Some(utils::u64_from_bytes(&v).map_err(|_| { | ||||
|                 Error::bad_database("Invalid private read marker bytes") | ||||
|             })?)) | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     /// Adds a presence event which will be saved until a new event replaces it.
 | ||||
|     ///
 | ||||
|     /// Note: This method takes a RoomId because presence updates are always bound to rooms to
 | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue