From 64b5298881539310d4d22faa6b94832be23da160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Commaille?= Date: Thu, 20 May 2021 17:14:57 +0200 Subject: [PATCH] base: Add support for read receipts --- matrix_sdk_base/src/client.rs | 12 +- matrix_sdk_base/src/rooms/normal.rs | 22 ++- matrix_sdk_base/src/store/memory_store.rs | 95 ++++++++++++- matrix_sdk_base/src/store/mod.rs | 53 ++++++- matrix_sdk_base/src/store/sled_store/mod.rs | 149 +++++++++++++++++++- 5 files changed, 323 insertions(+), 8 deletions(-) diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index d24a2f1c..b326363f 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -42,7 +42,8 @@ use matrix_sdk_common::{ events::{ room::member::{MemberEventContent, MembershipState}, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, - AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType, StateEvent, + AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType, + StateEvent, }, identifiers::{RoomId, UserId}, instant::Instant, @@ -784,6 +785,15 @@ impl BaseClient { ) .await?; + if let Some(event) = + new_info.ephemeral.events.iter().find_map(|e| match e.deserialize() { + Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => Some(event.content), + _ => None, + }) + { + changes.add_receipts(&room_id, event); + } + if new_info.timeline.limited { room_info.mark_members_missing(); } diff --git a/matrix_sdk_base/src/rooms/normal.rs b/matrix_sdk_base/src/rooms/normal.rs index e86f51a6..42dec411 100644 --- a/matrix_sdk_base/src/rooms/normal.rs +++ b/matrix_sdk_base/src/rooms/normal.rs @@ -24,6 +24,7 @@ use futures::{ use matrix_sdk_common::{ api::r0::sync::sync_events::RoomSummary as RumaSummary, events::{ + receipt::Receipt, room::{ create::CreateEventContent, encryption::EncryptionEventContent, guest_access::GuestAccess, history_visibility::HistoryVisibility, join_rules::JoinRule, @@ -32,7 +33,8 @@ use matrix_sdk_common::{ tag::Tags, AnyRoomAccountDataEvent, AnyStateEventContent, AnySyncStateEvent, EventType, }, - identifiers::{MxcUri, RoomAliasId, RoomId, UserId}, + identifiers::{EventId, MxcUri, RoomAliasId, RoomId, UserId}, + receipt::ReceiptType, }; use serde::{Deserialize, Serialize}; use tracing::info; @@ -449,6 +451,24 @@ impl Room { Ok(None) } } + + /// Get the read receipt as a `EventId` and `Receipt` tuple for the given + /// `user_id` in this room. + pub async fn user_read_receipt( + &self, + user_id: &UserId, + ) -> StoreResult> { + self.store.get_user_room_receipt_event(self.room_id(), ReceiptType::Read, user_id).await + } + + /// Get the read receipts as a list of `UserId` and `Receipt` tuples for the + /// given `event_id` in this room. + pub async fn event_read_receipts( + &self, + event_id: &EventId, + ) -> StoreResult> { + self.store.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, event_id).await + } } /// The underlying pure data structure for joined and left rooms. diff --git a/matrix_sdk_base/src/store/memory_store.rs b/matrix_sdk_base/src/store/memory_store.rs index 51c3ea33..1e93d1d0 100644 --- a/matrix_sdk_base/src/store/memory_store.rs +++ b/matrix_sdk_base/src/store/memory_store.rs @@ -22,12 +22,14 @@ use matrix_sdk_common::{ async_trait, events::{ presence::PresenceEvent, + receipt::Receipt, room::member::{MemberEventContent, MembershipState}, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventType, }, - identifiers::{RoomId, UserId}, + identifiers::{EventId, RoomId, UserId}, instant::Instant, + receipt::ReceiptType, Raw, }; use tracing::info; @@ -55,6 +57,11 @@ pub struct MemoryStore { Arc>>>>, stripped_members: Arc>>, presence: Arc>>, + #[allow(clippy::type_complexity)] + room_user_receipts: Arc>>>, + #[allow(clippy::type_complexity)] + room_event_receipts: + Arc>>>>, } impl MemoryStore { @@ -76,6 +83,8 @@ impl MemoryStore { stripped_room_state: DashMap::new().into(), stripped_members: DashMap::new().into(), presence: DashMap::new().into(), + room_user_receipts: DashMap::new().into(), + room_event_receipts: DashMap::new().into(), } } @@ -220,6 +229,43 @@ impl MemoryStore { } } + for (room, content) in &changes.receipts { + for (event_id, receipts) in &content.0 { + for (receipt_type, receipts) in receipts { + for (user_id, receipt) in receipts { + // Add the receipt to the room user receipts + if let Some((old_event, _)) = self + .room_user_receipts + .entry(room.clone()) + .or_insert_with(DashMap::new) + .entry(receipt_type.to_string()) + .or_insert_with(DashMap::new) + .insert(user_id.clone(), (event_id.clone(), receipt.clone())) + { + // Remove the old receipt from the room event receipts + if let Some(receipt_map) = self.room_event_receipts.get(room) { + if let Some(event_map) = receipt_map.get(receipt_type.as_ref()) { + if let Some(user_map) = event_map.get_mut(&old_event) { + user_map.remove(user_id); + } + } + } + } + + // Add the receipt to the room event receipts + self.room_event_receipts + .entry(room.clone()) + .or_insert_with(DashMap::new) + .entry(receipt_type.to_string()) + .or_insert_with(DashMap::new) + .entry(event_id.clone()) + .or_insert_with(DashMap::new) + .insert(user_id.clone(), receipt.clone()); + } + } + } + } + info!("Saved changes in {:?}", now.elapsed()); Ok(()) @@ -311,6 +357,35 @@ impl MemoryStore { .get(room_id) .and_then(|m| m.get(event_type.as_ref()).map(|e| e.clone()))) } + + async fn get_user_room_receipt_event( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + user_id: &UserId, + ) -> Result> { + Ok(self.room_user_receipts.get(room_id).and_then(|m| { + m.get(receipt_type.as_ref()).and_then(|m| m.get(user_id).map(|r| r.clone())) + })) + } + + async fn get_event_room_receipt_events( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + event_id: &EventId, + ) -> Result> { + Ok(self + .room_event_receipts + .get(room_id) + .and_then(|m| { + m.get(receipt_type.as_ref()).and_then(|m| { + m.get(event_id) + .map(|m| m.iter().map(|r| (r.key().clone(), r.value().clone())).collect()) + }) + }) + .unwrap_or_else(Vec::new)) + } } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -408,4 +483,22 @@ impl StateStore for MemoryStore { ) -> Result>> { self.get_room_account_data_event(room_id, event_type).await } + + async fn get_user_room_receipt_event( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + user_id: &UserId, + ) -> Result> { + self.get_user_room_receipt_event(room_id, receipt_type, user_id).await + } + + async fn get_event_room_receipt_events( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + event_id: &EventId, + ) -> Result> { + self.get_event_room_receipt_events(room_id, receipt_type, event_id).await + } } diff --git a/matrix_sdk_base/src/store/mod.rs b/matrix_sdk_base/src/store/mod.rs index a9b20dd0..2ef72014 100644 --- a/matrix_sdk_base/src/store/mod.rs +++ b/matrix_sdk_base/src/store/mod.rs @@ -25,11 +25,15 @@ use matrix_sdk_common::{ api::r0::push::get_notifications::Notification, async_trait, events::{ - presence::PresenceEvent, room::member::MemberEventContent, AnyGlobalAccountDataEvent, - AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType, + presence::PresenceEvent, + receipt::{Receipt, ReceiptEventContent}, + room::member::MemberEventContent, + AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent, + AnySyncStateEvent, EventContent, EventType, }, - identifiers::{RoomId, UserId}, + identifiers::{EventId, RoomId, UserId}, locks::RwLock, + receipt::ReceiptType, AsyncTraitDeps, Raw, }; #[cfg(feature = "sled_state_store")] @@ -210,6 +214,41 @@ pub trait StateStore: AsyncTraitDeps { room_id: &RoomId, event_type: EventType, ) -> Result>>; + + /// Get an event out of the user room receipt store. + /// + /// # Arguments + /// + /// * `room_id` - The id of the room for which the receipt should be + /// fetched. + /// + /// * `receipt_type` - The type of the receipt. + /// + /// * `user_id` - The id of the user for who the receipt should be fetched. + async fn get_user_room_receipt_event( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + user_id: &UserId, + ) -> Result>; + + /// Get events out of the event room receipt store. + /// + /// # Arguments + /// + /// * `room_id` - The id of the room for which the receipts should be + /// fetched. + /// + /// * `receipt_type` - The type of the receipts. + /// + /// * `event_id` - The id of the event for which the receipts should be + /// fetched. + async fn get_event_room_receipt_events( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + event_id: &EventId, + ) -> Result>; } /// A state store wrapper for the SDK. @@ -369,6 +408,8 @@ pub struct StateChanges { pub room_account_data: BTreeMap>>, /// A map of `RoomId` to `RoomInfo`. pub room_infos: BTreeMap, + /// A map of `RoomId` to `ReceiptEventContent`. + pub receipts: BTreeMap, /// A mapping of `RoomId` to a map of event type to a map of state key to /// `AnyStrippedStateEvent`. @@ -462,4 +503,10 @@ impl StateChanges { pub fn add_notification(&mut self, room_id: &RoomId, notification: Notification) { self.notifications.entry(room_id.to_owned()).or_insert_with(Vec::new).push(notification); } + + /// Update the `StateChanges` struct with the given room with a new + /// `Receipts`. + pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) { + self.receipts.insert(room_id.to_owned(), event); + } } diff --git a/matrix_sdk_base/src/store/sled_store/mod.rs b/matrix_sdk_base/src/store/sled_store/mod.rs index c60534bc..8f474d60 100644 --- a/matrix_sdk_base/src/store/sled_store/mod.rs +++ b/matrix_sdk_base/src/store/sled_store/mod.rs @@ -16,7 +16,7 @@ mod store_key; use std::{ collections::BTreeSet, - convert::TryFrom, + convert::{TryFrom, TryInto}, path::{Path, PathBuf}, sync::Arc, time::Instant, @@ -30,10 +30,12 @@ use matrix_sdk_common::{ async_trait, events::{ presence::PresenceEvent, + receipt::Receipt, room::member::{MemberEventContent, MembershipState}, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType, }, - identifiers::{RoomId, UserId}, + identifiers::{EventId, RoomId, UserId}, + receipt::ReceiptType, Raw, }; use serde::{Deserialize, Serialize}; @@ -127,12 +129,45 @@ impl EncodeKey for (&str, &str, &str) { } } +impl EncodeKey for (&str, &str, &str, &str) { + fn encode(&self) -> Vec { + [ + self.0.as_bytes(), + &[ENCODE_SEPARATOR], + self.1.as_bytes(), + &[ENCODE_SEPARATOR], + self.2.as_bytes(), + &[ENCODE_SEPARATOR], + self.3.as_bytes(), + &[ENCODE_SEPARATOR], + ] + .concat() + } +} + impl EncodeKey for EventType { fn encode(&self) -> Vec { self.as_str().encode() } } +/// Get the value at `position` in encoded `key`. +/// +/// The key must have been encoded with the `EncodeKey` trait. `position` +/// corresponds to the position in the tuple before the key was encoded. If it +/// wasn't encoded in a tuple, use `0`. +/// +/// Returns `None` if there is no key at `position`. +pub fn decode_key_value(key: &[u8], position: usize) -> Option { + let values: Vec<&[u8]> = key.split(|v| *v == ENCODE_SEPARATOR).collect(); + + if position >= values.len() { + return None; + } + + Some(String::from_utf8_lossy(values[position]).to_string()) +} + #[derive(Clone)] pub struct SledStore { path: Option, @@ -152,6 +187,8 @@ pub struct SledStore { stripped_room_state: Tree, stripped_members: Tree, presence: Tree, + room_user_receipts: Tree, + room_event_receipts: Tree, } impl std::fmt::Debug for SledStore { @@ -184,6 +221,9 @@ impl SledStore { let stripped_members = db.open_tree("stripped_members")?; let stripped_room_state = db.open_tree("stripped_room_state")?; + let room_user_receipts = db.open_tree("room_user_receipts")?; + let room_event_receipts = db.open_tree("room_event_receipts")?; + Ok(Self { path, inner: db, @@ -202,6 +242,8 @@ impl SledStore { stripped_room_info, stripped_members, stripped_room_state, + room_user_receipts, + room_event_receipts, }) } @@ -459,6 +501,58 @@ impl SledStore { ret?; + let ret: Result<(), TransactionError> = + (&self.room_user_receipts, &self.room_event_receipts).transaction( + |(room_user_receipts, room_event_receipts)| { + for (room, content) in &changes.receipts { + for (event_id, receipts) in &content.0 { + for (receipt_type, receipts) in receipts { + for (user_id, receipt) in receipts { + // Add the receipt to the room user receipts + if let Some(old) = room_user_receipts.insert( + (room.as_str(), receipt_type.as_ref(), user_id.as_str()) + .encode(), + self.serialize_event(&(event_id, receipt)) + .map_err(ConflictableTransactionError::Abort)?, + )? { + // Remove the old receipt from the room event receipts + let (old_event, _): (EventId, Receipt) = self + .deserialize_event(&old) + .map_err(ConflictableTransactionError::Abort)?; + room_event_receipts.remove( + ( + room.as_str(), + receipt_type.as_ref(), + old_event.as_str(), + user_id.as_str(), + ) + .encode(), + )?; + } + + // Add the receipt to the room event receipts + room_event_receipts.insert( + ( + room.as_str(), + receipt_type.as_ref(), + event_id.as_str(), + user_id.as_str(), + ) + .encode(), + self.serialize_event(receipt) + .map_err(ConflictableTransactionError::Abort)?, + )?; + } + } + } + } + + Ok(()) + }, + ); + + ret?; + self.inner.flush_async().await?; info!("Saved changes in {:?}", now.elapsed()); @@ -598,6 +692,39 @@ impl SledStore { .map(|m| self.deserialize_event(&m)) .transpose()?) } + + async fn get_user_room_receipt_event( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + user_id: &UserId, + ) -> Result> { + Ok(self + .room_user_receipts + .get((room_id.as_str(), receipt_type.as_ref(), user_id.as_str()).encode())? + .map(|m| self.deserialize_event(&m)) + .transpose()?) + } + + async fn get_event_room_receipt_events( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + event_id: &EventId, + ) -> Result> { + self.room_event_receipts + .scan_prefix((room_id.as_str(), receipt_type.as_ref(), event_id.as_str()).encode()) + .map(|u| { + u.map_err(StoreError::Sled).and_then(|(key, value)| { + self.deserialize_event(&value) + .map(|receipt| { + (decode_key_value(&key, 3).unwrap().try_into().unwrap(), receipt) + }) + .map_err(Into::into) + }) + }) + .collect() + } } #[async_trait] @@ -689,6 +816,24 @@ impl StateStore for SledStore { ) -> Result>> { self.get_room_account_data_event(room_id, event_type).await } + + async fn get_user_room_receipt_event( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + user_id: &UserId, + ) -> Result> { + self.get_user_room_receipt_event(room_id, receipt_type, user_id).await + } + + async fn get_event_room_receipt_events( + &self, + room_id: &RoomId, + receipt_type: ReceiptType, + event_id: &EventId, + ) -> Result> { + self.get_event_room_receipt_events(room_id, receipt_type, event_id).await + } } #[cfg(test)]