base: Add support for read receipts

This commit is contained in:
Kévin Commaille 2021-05-20 17:14:57 +02:00
parent fe17dce813
commit 64b5298881
No known key found for this signature in database
GPG key ID: 296D60AE1E61661C
5 changed files with 323 additions and 8 deletions

View file

@ -42,7 +42,8 @@ use matrix_sdk_common::{
events::{
room::member::{MemberEventContent, MembershipState},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType, StateEvent,
AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType,
StateEvent,
},
identifiers::{RoomId, UserId},
instant::Instant,
@ -784,6 +785,15 @@ impl BaseClient {
)
.await?;
if let Some(event) =
new_info.ephemeral.events.iter().find_map(|e| match e.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => Some(event.content),
_ => None,
})
{
changes.add_receipts(&room_id, event);
}
if new_info.timeline.limited {
room_info.mark_members_missing();
}

View file

@ -24,6 +24,7 @@ use futures::{
use matrix_sdk_common::{
api::r0::sync::sync_events::RoomSummary as RumaSummary,
events::{
receipt::Receipt,
room::{
create::CreateEventContent, encryption::EncryptionEventContent,
guest_access::GuestAccess, history_visibility::HistoryVisibility, join_rules::JoinRule,
@ -32,7 +33,8 @@ use matrix_sdk_common::{
tag::Tags,
AnyRoomAccountDataEvent, AnyStateEventContent, AnySyncStateEvent, EventType,
},
identifiers::{MxcUri, RoomAliasId, RoomId, UserId},
identifiers::{EventId, MxcUri, RoomAliasId, RoomId, UserId},
receipt::ReceiptType,
};
use serde::{Deserialize, Serialize};
use tracing::info;
@ -449,6 +451,24 @@ impl Room {
Ok(None)
}
}
/// Get the read receipt as a `EventId` and `Receipt` tuple for the given
/// `user_id` in this room.
pub async fn user_read_receipt(
&self,
user_id: &UserId,
) -> StoreResult<Option<(EventId, Receipt)>> {
self.store.get_user_room_receipt_event(self.room_id(), ReceiptType::Read, user_id).await
}
/// Get the read receipts as a list of `UserId` and `Receipt` tuples for the
/// given `event_id` in this room.
pub async fn event_read_receipts(
&self,
event_id: &EventId,
) -> StoreResult<Vec<(UserId, Receipt)>> {
self.store.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, event_id).await
}
}
/// The underlying pure data structure for joined and left rooms.

View file

@ -22,12 +22,14 @@ use matrix_sdk_common::{
async_trait,
events::{
presence::PresenceEvent,
receipt::Receipt,
room::member::{MemberEventContent, MembershipState},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, EventType,
},
identifiers::{RoomId, UserId},
identifiers::{EventId, RoomId, UserId},
instant::Instant,
receipt::ReceiptType,
Raw,
};
use tracing::info;
@ -55,6 +57,11 @@ pub struct MemoryStore {
Arc<DashMap<RoomId, DashMap<String, DashMap<String, Raw<AnyStrippedStateEvent>>>>>,
stripped_members: Arc<DashMap<RoomId, DashMap<UserId, StrippedMemberEvent>>>,
presence: Arc<DashMap<UserId, Raw<PresenceEvent>>>,
#[allow(clippy::type_complexity)]
room_user_receipts: Arc<DashMap<RoomId, DashMap<String, DashMap<UserId, (EventId, Receipt)>>>>,
#[allow(clippy::type_complexity)]
room_event_receipts:
Arc<DashMap<RoomId, DashMap<String, DashMap<EventId, DashMap<UserId, Receipt>>>>>,
}
impl MemoryStore {
@ -76,6 +83,8 @@ impl MemoryStore {
stripped_room_state: DashMap::new().into(),
stripped_members: DashMap::new().into(),
presence: DashMap::new().into(),
room_user_receipts: DashMap::new().into(),
room_event_receipts: DashMap::new().into(),
}
}
@ -220,6 +229,43 @@ impl MemoryStore {
}
}
for (room, content) in &changes.receipts {
for (event_id, receipts) in &content.0 {
for (receipt_type, receipts) in receipts {
for (user_id, receipt) in receipts {
// Add the receipt to the room user receipts
if let Some((old_event, _)) = self
.room_user_receipts
.entry(room.clone())
.or_insert_with(DashMap::new)
.entry(receipt_type.to_string())
.or_insert_with(DashMap::new)
.insert(user_id.clone(), (event_id.clone(), receipt.clone()))
{
// Remove the old receipt from the room event receipts
if let Some(receipt_map) = self.room_event_receipts.get(room) {
if let Some(event_map) = receipt_map.get(receipt_type.as_ref()) {
if let Some(user_map) = event_map.get_mut(&old_event) {
user_map.remove(user_id);
}
}
}
}
// Add the receipt to the room event receipts
self.room_event_receipts
.entry(room.clone())
.or_insert_with(DashMap::new)
.entry(receipt_type.to_string())
.or_insert_with(DashMap::new)
.entry(event_id.clone())
.or_insert_with(DashMap::new)
.insert(user_id.clone(), receipt.clone());
}
}
}
}
info!("Saved changes in {:?}", now.elapsed());
Ok(())
@ -311,6 +357,35 @@ impl MemoryStore {
.get(room_id)
.and_then(|m| m.get(event_type.as_ref()).map(|e| e.clone())))
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
user_id: &UserId,
) -> Result<Option<(EventId, Receipt)>> {
Ok(self.room_user_receipts.get(room_id).and_then(|m| {
m.get(receipt_type.as_ref()).and_then(|m| m.get(user_id).map(|r| r.clone()))
}))
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(UserId, Receipt)>> {
Ok(self
.room_event_receipts
.get(room_id)
.and_then(|m| {
m.get(receipt_type.as_ref()).and_then(|m| {
m.get(event_id)
.map(|m| m.iter().map(|r| (r.key().clone(), r.value().clone())).collect())
})
})
.unwrap_or_else(Vec::new))
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@ -408,4 +483,22 @@ impl StateStore for MemoryStore {
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
self.get_room_account_data_event(room_id, event_type).await
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
user_id: &UserId,
) -> Result<Option<(EventId, Receipt)>> {
self.get_user_room_receipt_event(room_id, receipt_type, user_id).await
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(UserId, Receipt)>> {
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
}
}

View file

@ -25,11 +25,15 @@ use matrix_sdk_common::{
api::r0::push::get_notifications::Notification,
async_trait,
events::{
presence::PresenceEvent, room::member::MemberEventContent, AnyGlobalAccountDataEvent,
AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType,
presence::PresenceEvent,
receipt::{Receipt, ReceiptEventContent},
room::member::MemberEventContent,
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, EventContent, EventType,
},
identifiers::{RoomId, UserId},
identifiers::{EventId, RoomId, UserId},
locks::RwLock,
receipt::ReceiptType,
AsyncTraitDeps, Raw,
};
#[cfg(feature = "sled_state_store")]
@ -210,6 +214,41 @@ pub trait StateStore: AsyncTraitDeps {
room_id: &RoomId,
event_type: EventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>>;
/// Get an event out of the user room receipt store.
///
/// # Arguments
///
/// * `room_id` - The id of the room for which the receipt should be
/// fetched.
///
/// * `receipt_type` - The type of the receipt.
///
/// * `user_id` - The id of the user for who the receipt should be fetched.
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
user_id: &UserId,
) -> Result<Option<(EventId, Receipt)>>;
/// Get events out of the event room receipt store.
///
/// # Arguments
///
/// * `room_id` - The id of the room for which the receipts should be
/// fetched.
///
/// * `receipt_type` - The type of the receipts.
///
/// * `event_id` - The id of the event for which the receipts should be
/// fetched.
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(UserId, Receipt)>>;
}
/// A state store wrapper for the SDK.
@ -369,6 +408,8 @@ pub struct StateChanges {
pub room_account_data: BTreeMap<RoomId, BTreeMap<String, Raw<AnyRoomAccountDataEvent>>>,
/// A map of `RoomId` to `RoomInfo`.
pub room_infos: BTreeMap<RoomId, RoomInfo>,
/// A map of `RoomId` to `ReceiptEventContent`.
pub receipts: BTreeMap<RoomId, ReceiptEventContent>,
/// A mapping of `RoomId` to a map of event type to a map of state key to
/// `AnyStrippedStateEvent`.
@ -462,4 +503,10 @@ impl StateChanges {
pub fn add_notification(&mut self, room_id: &RoomId, notification: Notification) {
self.notifications.entry(room_id.to_owned()).or_insert_with(Vec::new).push(notification);
}
/// Update the `StateChanges` struct with the given room with a new
/// `Receipts`.
pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
self.receipts.insert(room_id.to_owned(), event);
}
}

View file

@ -16,7 +16,7 @@ mod store_key;
use std::{
collections::BTreeSet,
convert::TryFrom,
convert::{TryFrom, TryInto},
path::{Path, PathBuf},
sync::Arc,
time::Instant,
@ -30,10 +30,12 @@ use matrix_sdk_common::{
async_trait,
events::{
presence::PresenceEvent,
receipt::Receipt,
room::member::{MemberEventContent, MembershipState},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType,
},
identifiers::{RoomId, UserId},
identifiers::{EventId, RoomId, UserId},
receipt::ReceiptType,
Raw,
};
use serde::{Deserialize, Serialize};
@ -127,12 +129,45 @@ impl EncodeKey for (&str, &str, &str) {
}
}
impl EncodeKey for (&str, &str, &str, &str) {
fn encode(&self) -> Vec<u8> {
[
self.0.as_bytes(),
&[ENCODE_SEPARATOR],
self.1.as_bytes(),
&[ENCODE_SEPARATOR],
self.2.as_bytes(),
&[ENCODE_SEPARATOR],
self.3.as_bytes(),
&[ENCODE_SEPARATOR],
]
.concat()
}
}
impl EncodeKey for EventType {
fn encode(&self) -> Vec<u8> {
self.as_str().encode()
}
}
/// Get the value at `position` in encoded `key`.
///
/// The key must have been encoded with the `EncodeKey` trait. `position`
/// corresponds to the position in the tuple before the key was encoded. If it
/// wasn't encoded in a tuple, use `0`.
///
/// Returns `None` if there is no key at `position`.
pub fn decode_key_value(key: &[u8], position: usize) -> Option<String> {
let values: Vec<&[u8]> = key.split(|v| *v == ENCODE_SEPARATOR).collect();
if position >= values.len() {
return None;
}
Some(String::from_utf8_lossy(values[position]).to_string())
}
#[derive(Clone)]
pub struct SledStore {
path: Option<PathBuf>,
@ -152,6 +187,8 @@ pub struct SledStore {
stripped_room_state: Tree,
stripped_members: Tree,
presence: Tree,
room_user_receipts: Tree,
room_event_receipts: Tree,
}
impl std::fmt::Debug for SledStore {
@ -184,6 +221,9 @@ impl SledStore {
let stripped_members = db.open_tree("stripped_members")?;
let stripped_room_state = db.open_tree("stripped_room_state")?;
let room_user_receipts = db.open_tree("room_user_receipts")?;
let room_event_receipts = db.open_tree("room_event_receipts")?;
Ok(Self {
path,
inner: db,
@ -202,6 +242,8 @@ impl SledStore {
stripped_room_info,
stripped_members,
stripped_room_state,
room_user_receipts,
room_event_receipts,
})
}
@ -459,6 +501,58 @@ impl SledStore {
ret?;
let ret: Result<(), TransactionError<SerializationError>> =
(&self.room_user_receipts, &self.room_event_receipts).transaction(
|(room_user_receipts, room_event_receipts)| {
for (room, content) in &changes.receipts {
for (event_id, receipts) in &content.0 {
for (receipt_type, receipts) in receipts {
for (user_id, receipt) in receipts {
// Add the receipt to the room user receipts
if let Some(old) = room_user_receipts.insert(
(room.as_str(), receipt_type.as_ref(), user_id.as_str())
.encode(),
self.serialize_event(&(event_id, receipt))
.map_err(ConflictableTransactionError::Abort)?,
)? {
// Remove the old receipt from the room event receipts
let (old_event, _): (EventId, Receipt) = self
.deserialize_event(&old)
.map_err(ConflictableTransactionError::Abort)?;
room_event_receipts.remove(
(
room.as_str(),
receipt_type.as_ref(),
old_event.as_str(),
user_id.as_str(),
)
.encode(),
)?;
}
// Add the receipt to the room event receipts
room_event_receipts.insert(
(
room.as_str(),
receipt_type.as_ref(),
event_id.as_str(),
user_id.as_str(),
)
.encode(),
self.serialize_event(receipt)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
}
}
}
Ok(())
},
);
ret?;
self.inner.flush_async().await?;
info!("Saved changes in {:?}", now.elapsed());
@ -598,6 +692,39 @@ impl SledStore {
.map(|m| self.deserialize_event(&m))
.transpose()?)
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
user_id: &UserId,
) -> Result<Option<(EventId, Receipt)>> {
Ok(self
.room_user_receipts
.get((room_id.as_str(), receipt_type.as_ref(), user_id.as_str()).encode())?
.map(|m| self.deserialize_event(&m))
.transpose()?)
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(UserId, Receipt)>> {
self.room_event_receipts
.scan_prefix((room_id.as_str(), receipt_type.as_ref(), event_id.as_str()).encode())
.map(|u| {
u.map_err(StoreError::Sled).and_then(|(key, value)| {
self.deserialize_event(&value)
.map(|receipt| {
(decode_key_value(&key, 3).unwrap().try_into().unwrap(), receipt)
})
.map_err(Into::into)
})
})
.collect()
}
}
#[async_trait]
@ -689,6 +816,24 @@ impl StateStore for SledStore {
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
self.get_room_account_data_event(room_id, event_type).await
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
user_id: &UserId,
) -> Result<Option<(EventId, Receipt)>> {
self.get_user_room_receipt_event(room_id, receipt_type, user_id).await
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(UserId, Receipt)>> {
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
}
}
#[cfg(test)]