Merge branch 'read-receipts'
commit
bdd51a323a
|
@ -42,7 +42,8 @@ use matrix_sdk_common::{
|
||||||
events::{
|
events::{
|
||||||
room::member::{MemberEventContent, MembershipState},
|
room::member::{MemberEventContent, MembershipState},
|
||||||
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
|
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
|
||||||
AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType, StateEvent,
|
AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent, EventContent, EventType,
|
||||||
|
StateEvent,
|
||||||
},
|
},
|
||||||
identifiers::{RoomId, UserId},
|
identifiers::{RoomId, UserId},
|
||||||
instant::Instant,
|
instant::Instant,
|
||||||
|
@ -784,6 +785,15 @@ impl BaseClient {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
if let Some(event) =
|
||||||
|
new_info.ephemeral.events.iter().find_map(|e| match e.deserialize() {
|
||||||
|
Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => Some(event.content),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
{
|
||||||
|
changes.add_receipts(&room_id, event);
|
||||||
|
}
|
||||||
|
|
||||||
if new_info.timeline.limited {
|
if new_info.timeline.limited {
|
||||||
room_info.mark_members_missing();
|
room_info.mark_members_missing();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ use futures::{
|
||||||
use matrix_sdk_common::{
|
use matrix_sdk_common::{
|
||||||
api::r0::sync::sync_events::RoomSummary as RumaSummary,
|
api::r0::sync::sync_events::RoomSummary as RumaSummary,
|
||||||
events::{
|
events::{
|
||||||
|
receipt::Receipt,
|
||||||
room::{
|
room::{
|
||||||
create::CreateEventContent, encryption::EncryptionEventContent,
|
create::CreateEventContent, encryption::EncryptionEventContent,
|
||||||
guest_access::GuestAccess, history_visibility::HistoryVisibility, join_rules::JoinRule,
|
guest_access::GuestAccess, history_visibility::HistoryVisibility, join_rules::JoinRule,
|
||||||
|
@ -32,7 +33,8 @@ use matrix_sdk_common::{
|
||||||
tag::Tags,
|
tag::Tags,
|
||||||
AnyRoomAccountDataEvent, AnyStateEventContent, AnySyncStateEvent, EventType,
|
AnyRoomAccountDataEvent, AnyStateEventContent, AnySyncStateEvent, EventType,
|
||||||
},
|
},
|
||||||
identifiers::{MxcUri, RoomAliasId, RoomId, UserId},
|
identifiers::{EventId, MxcUri, RoomAliasId, RoomId, UserId},
|
||||||
|
receipt::ReceiptType,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
@ -449,6 +451,24 @@ impl Room {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the read receipt as a `EventId` and `Receipt` tuple for the given
|
||||||
|
/// `user_id` in this room.
|
||||||
|
pub async fn user_read_receipt(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
) -> StoreResult<Option<(EventId, Receipt)>> {
|
||||||
|
self.store.get_user_room_receipt_event(self.room_id(), ReceiptType::Read, user_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the read receipts as a list of `UserId` and `Receipt` tuples for the
|
||||||
|
/// given `event_id` in this room.
|
||||||
|
pub async fn event_read_receipts(
|
||||||
|
&self,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> StoreResult<Vec<(UserId, Receipt)>> {
|
||||||
|
self.store.get_event_room_receipt_events(self.room_id(), ReceiptType::Read, event_id).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The underlying pure data structure for joined and left rooms.
|
/// The underlying pure data structure for joined and left rooms.
|
||||||
|
|
|
@ -22,12 +22,14 @@ use matrix_sdk_common::{
|
||||||
async_trait,
|
async_trait,
|
||||||
events::{
|
events::{
|
||||||
presence::PresenceEvent,
|
presence::PresenceEvent,
|
||||||
|
receipt::Receipt,
|
||||||
room::member::{MemberEventContent, MembershipState},
|
room::member::{MemberEventContent, MembershipState},
|
||||||
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
|
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
|
||||||
AnySyncStateEvent, EventType,
|
AnySyncStateEvent, EventType,
|
||||||
},
|
},
|
||||||
identifiers::{RoomId, UserId},
|
identifiers::{EventId, RoomId, UserId},
|
||||||
instant::Instant,
|
instant::Instant,
|
||||||
|
receipt::ReceiptType,
|
||||||
Raw,
|
Raw,
|
||||||
};
|
};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
@ -55,6 +57,11 @@ pub struct MemoryStore {
|
||||||
Arc<DashMap<RoomId, DashMap<String, DashMap<String, Raw<AnyStrippedStateEvent>>>>>,
|
Arc<DashMap<RoomId, DashMap<String, DashMap<String, Raw<AnyStrippedStateEvent>>>>>,
|
||||||
stripped_members: Arc<DashMap<RoomId, DashMap<UserId, StrippedMemberEvent>>>,
|
stripped_members: Arc<DashMap<RoomId, DashMap<UserId, StrippedMemberEvent>>>,
|
||||||
presence: Arc<DashMap<UserId, Raw<PresenceEvent>>>,
|
presence: Arc<DashMap<UserId, Raw<PresenceEvent>>>,
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
room_user_receipts: Arc<DashMap<RoomId, DashMap<String, DashMap<UserId, (EventId, Receipt)>>>>,
|
||||||
|
#[allow(clippy::type_complexity)]
|
||||||
|
room_event_receipts:
|
||||||
|
Arc<DashMap<RoomId, DashMap<String, DashMap<EventId, DashMap<UserId, Receipt>>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MemoryStore {
|
impl MemoryStore {
|
||||||
|
@ -76,6 +83,8 @@ impl MemoryStore {
|
||||||
stripped_room_state: DashMap::new().into(),
|
stripped_room_state: DashMap::new().into(),
|
||||||
stripped_members: DashMap::new().into(),
|
stripped_members: DashMap::new().into(),
|
||||||
presence: DashMap::new().into(),
|
presence: DashMap::new().into(),
|
||||||
|
room_user_receipts: DashMap::new().into(),
|
||||||
|
room_event_receipts: DashMap::new().into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,6 +229,43 @@ impl MemoryStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (room, content) in &changes.receipts {
|
||||||
|
for (event_id, receipts) in &content.0 {
|
||||||
|
for (receipt_type, receipts) in receipts {
|
||||||
|
for (user_id, receipt) in receipts {
|
||||||
|
// Add the receipt to the room user receipts
|
||||||
|
if let Some((old_event, _)) = self
|
||||||
|
.room_user_receipts
|
||||||
|
.entry(room.clone())
|
||||||
|
.or_insert_with(DashMap::new)
|
||||||
|
.entry(receipt_type.to_string())
|
||||||
|
.or_insert_with(DashMap::new)
|
||||||
|
.insert(user_id.clone(), (event_id.clone(), receipt.clone()))
|
||||||
|
{
|
||||||
|
// Remove the old receipt from the room event receipts
|
||||||
|
if let Some(receipt_map) = self.room_event_receipts.get(room) {
|
||||||
|
if let Some(event_map) = receipt_map.get(receipt_type.as_ref()) {
|
||||||
|
if let Some(user_map) = event_map.get_mut(&old_event) {
|
||||||
|
user_map.remove(user_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the receipt to the room event receipts
|
||||||
|
self.room_event_receipts
|
||||||
|
.entry(room.clone())
|
||||||
|
.or_insert_with(DashMap::new)
|
||||||
|
.entry(receipt_type.to_string())
|
||||||
|
.or_insert_with(DashMap::new)
|
||||||
|
.entry(event_id.clone())
|
||||||
|
.or_insert_with(DashMap::new)
|
||||||
|
.insert(user_id.clone(), receipt.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Saved changes in {:?}", now.elapsed());
|
info!("Saved changes in {:?}", now.elapsed());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -311,6 +357,35 @@ impl MemoryStore {
|
||||||
.get(room_id)
|
.get(room_id)
|
||||||
.and_then(|m| m.get(event_type.as_ref()).map(|e| e.clone())))
|
.and_then(|m| m.get(event_type.as_ref()).map(|e| e.clone())))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_user_room_receipt_event(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
user_id: &UserId,
|
||||||
|
) -> Result<Option<(EventId, Receipt)>> {
|
||||||
|
Ok(self.room_user_receipts.get(room_id).and_then(|m| {
|
||||||
|
m.get(receipt_type.as_ref()).and_then(|m| m.get(user_id).map(|r| r.clone()))
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_event_room_receipt_events(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> Result<Vec<(UserId, Receipt)>> {
|
||||||
|
Ok(self
|
||||||
|
.room_event_receipts
|
||||||
|
.get(room_id)
|
||||||
|
.and_then(|m| {
|
||||||
|
m.get(receipt_type.as_ref()).and_then(|m| {
|
||||||
|
m.get(event_id)
|
||||||
|
.map(|m| m.iter().map(|r| (r.key().clone(), r.value().clone())).collect())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.unwrap_or_else(Vec::new))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||||
|
@ -408,4 +483,133 @@ impl StateStore for MemoryStore {
|
||||||
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
|
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
|
||||||
self.get_room_account_data_event(room_id, event_type).await
|
self.get_room_account_data_event(room_id, event_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_user_room_receipt_event(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
user_id: &UserId,
|
||||||
|
) -> Result<Option<(EventId, Receipt)>> {
|
||||||
|
self.get_user_room_receipt_event(room_id, receipt_type, user_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_event_room_receipt_events(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> Result<Vec<(UserId, Receipt)>> {
|
||||||
|
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[cfg(not(feature = "sled_state_store"))]
|
||||||
|
mod test {
|
||||||
|
use matrix_sdk_common::{
|
||||||
|
identifiers::{event_id, room_id, user_id},
|
||||||
|
receipt::ReceiptType,
|
||||||
|
};
|
||||||
|
use matrix_sdk_test::async_test;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::{MemoryStore, StateChanges};
|
||||||
|
|
||||||
|
fn user_id() -> UserId {
|
||||||
|
user_id!("@example:localhost")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_test]
|
||||||
|
async fn test_receipts_saving() {
|
||||||
|
let store = MemoryStore::new();
|
||||||
|
|
||||||
|
let room_id = room_id!("!test:localhost");
|
||||||
|
|
||||||
|
let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
|
||||||
|
let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
|
||||||
|
|
||||||
|
let first_receipt_event = serde_json::from_value(json!({
|
||||||
|
first_event_id.clone(): {
|
||||||
|
"m.read": {
|
||||||
|
user_id(): {
|
||||||
|
"ts": 1436451550453u64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let second_receipt_event = serde_json::from_value(json!({
|
||||||
|
second_event_id.clone(): {
|
||||||
|
"m.read": {
|
||||||
|
user_id(): {
|
||||||
|
"ts": 1436451551453u64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(store
|
||||||
|
.get_user_room_receipt_event(&room_id, ReceiptType::Read, &user_id())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &first_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &second_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
|
||||||
|
let mut changes = StateChanges::default();
|
||||||
|
changes.add_receipts(&room_id, first_receipt_event);
|
||||||
|
|
||||||
|
store.save_changes(&changes).await.unwrap();
|
||||||
|
assert!(store
|
||||||
|
.get_user_room_receipt_event(&room_id, ReceiptType::Read, &user_id())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_some(),);
|
||||||
|
assert_eq!(
|
||||||
|
store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &first_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.len(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &second_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
|
||||||
|
let mut changes = StateChanges::default();
|
||||||
|
changes.add_receipts(&room_id, second_receipt_event);
|
||||||
|
|
||||||
|
store.save_changes(&changes).await.unwrap();
|
||||||
|
assert!(store
|
||||||
|
.get_user_room_receipt_event(&room_id, ReceiptType::Read, &user_id())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_some());
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &first_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
assert_eq!(
|
||||||
|
store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &second_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.len(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,11 +25,15 @@ use matrix_sdk_common::{
|
||||||
api::r0::push::get_notifications::Notification,
|
api::r0::push::get_notifications::Notification,
|
||||||
async_trait,
|
async_trait,
|
||||||
events::{
|
events::{
|
||||||
presence::PresenceEvent, room::member::MemberEventContent, AnyGlobalAccountDataEvent,
|
presence::PresenceEvent,
|
||||||
AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, EventContent, EventType,
|
receipt::{Receipt, ReceiptEventContent},
|
||||||
|
room::member::MemberEventContent,
|
||||||
|
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
|
||||||
|
AnySyncStateEvent, EventContent, EventType,
|
||||||
},
|
},
|
||||||
identifiers::{RoomId, UserId},
|
identifiers::{EventId, RoomId, UserId},
|
||||||
locks::RwLock,
|
locks::RwLock,
|
||||||
|
receipt::ReceiptType,
|
||||||
AsyncTraitDeps, Raw,
|
AsyncTraitDeps, Raw,
|
||||||
};
|
};
|
||||||
#[cfg(feature = "sled_state_store")]
|
#[cfg(feature = "sled_state_store")]
|
||||||
|
@ -210,6 +214,41 @@ pub trait StateStore: AsyncTraitDeps {
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
event_type: EventType,
|
event_type: EventType,
|
||||||
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>>;
|
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>>;
|
||||||
|
|
||||||
|
/// Get an event out of the user room receipt store.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `room_id` - The id of the room for which the receipt should be
|
||||||
|
/// fetched.
|
||||||
|
///
|
||||||
|
/// * `receipt_type` - The type of the receipt.
|
||||||
|
///
|
||||||
|
/// * `user_id` - The id of the user for who the receipt should be fetched.
|
||||||
|
async fn get_user_room_receipt_event(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
user_id: &UserId,
|
||||||
|
) -> Result<Option<(EventId, Receipt)>>;
|
||||||
|
|
||||||
|
/// Get events out of the event room receipt store.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `room_id` - The id of the room for which the receipts should be
|
||||||
|
/// fetched.
|
||||||
|
///
|
||||||
|
/// * `receipt_type` - The type of the receipts.
|
||||||
|
///
|
||||||
|
/// * `event_id` - The id of the event for which the receipts should be
|
||||||
|
/// fetched.
|
||||||
|
async fn get_event_room_receipt_events(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> Result<Vec<(UserId, Receipt)>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A state store wrapper for the SDK.
|
/// A state store wrapper for the SDK.
|
||||||
|
@ -366,6 +405,8 @@ pub struct StateChanges {
|
||||||
pub room_account_data: BTreeMap<RoomId, BTreeMap<String, Raw<AnyRoomAccountDataEvent>>>,
|
pub room_account_data: BTreeMap<RoomId, BTreeMap<String, Raw<AnyRoomAccountDataEvent>>>,
|
||||||
/// A map of `RoomId` to `RoomInfo`.
|
/// A map of `RoomId` to `RoomInfo`.
|
||||||
pub room_infos: BTreeMap<RoomId, RoomInfo>,
|
pub room_infos: BTreeMap<RoomId, RoomInfo>,
|
||||||
|
/// A map of `RoomId` to `ReceiptEventContent`.
|
||||||
|
pub receipts: BTreeMap<RoomId, ReceiptEventContent>,
|
||||||
|
|
||||||
/// A mapping of `RoomId` to a map of event type to a map of state key to
|
/// A mapping of `RoomId` to a map of event type to a map of state key to
|
||||||
/// `AnyStrippedStateEvent`.
|
/// `AnyStrippedStateEvent`.
|
||||||
|
@ -459,4 +500,10 @@ impl StateChanges {
|
||||||
pub fn add_notification(&mut self, room_id: &RoomId, notification: Notification) {
|
pub fn add_notification(&mut self, room_id: &RoomId, notification: Notification) {
|
||||||
self.notifications.entry(room_id.to_owned()).or_insert_with(Vec::new).push(notification);
|
self.notifications.entry(room_id.to_owned()).or_insert_with(Vec::new).push(notification);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Update the `StateChanges` struct with the given room with a new
|
||||||
|
/// `Receipts`.
|
||||||
|
pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
|
||||||
|
self.receipts.insert(room_id.to_owned(), event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ mod store_key;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeSet,
|
collections::BTreeSet,
|
||||||
convert::TryFrom,
|
convert::{TryFrom, TryInto},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Instant,
|
time::Instant,
|
||||||
|
@ -30,10 +30,12 @@ use matrix_sdk_common::{
|
||||||
async_trait,
|
async_trait,
|
||||||
events::{
|
events::{
|
||||||
presence::PresenceEvent,
|
presence::PresenceEvent,
|
||||||
|
receipt::Receipt,
|
||||||
room::member::{MemberEventContent, MembershipState},
|
room::member::{MemberEventContent, MembershipState},
|
||||||
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType,
|
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType,
|
||||||
},
|
},
|
||||||
identifiers::{RoomId, UserId},
|
identifiers::{EventId, RoomId, UserId},
|
||||||
|
receipt::ReceiptType,
|
||||||
Raw,
|
Raw,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -127,12 +129,41 @@ impl EncodeKey for (&str, &str, &str) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl EncodeKey for (&str, &str, &str, &str) {
|
||||||
|
fn encode(&self) -> Vec<u8> {
|
||||||
|
[
|
||||||
|
self.0.as_bytes(),
|
||||||
|
&[ENCODE_SEPARATOR],
|
||||||
|
self.1.as_bytes(),
|
||||||
|
&[ENCODE_SEPARATOR],
|
||||||
|
self.2.as_bytes(),
|
||||||
|
&[ENCODE_SEPARATOR],
|
||||||
|
self.3.as_bytes(),
|
||||||
|
&[ENCODE_SEPARATOR],
|
||||||
|
]
|
||||||
|
.concat()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl EncodeKey for EventType {
|
impl EncodeKey for EventType {
|
||||||
fn encode(&self) -> Vec<u8> {
|
fn encode(&self) -> Vec<u8> {
|
||||||
self.as_str().encode()
|
self.as_str().encode()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the value at `position` in encoded `key`.
|
||||||
|
///
|
||||||
|
/// The key must have been encoded with the `EncodeKey` trait. `position`
|
||||||
|
/// corresponds to the position in the tuple before the key was encoded. If it
|
||||||
|
/// wasn't encoded in a tuple, use `0`.
|
||||||
|
///
|
||||||
|
/// Returns `None` if there is no key at `position`.
|
||||||
|
pub fn decode_key_value(key: &[u8], position: usize) -> Option<String> {
|
||||||
|
let values: Vec<&[u8]> = key.split(|v| *v == ENCODE_SEPARATOR).collect();
|
||||||
|
|
||||||
|
values.get(position).map(|s| String::from_utf8_lossy(s).to_string())
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SledStore {
|
pub struct SledStore {
|
||||||
path: Option<PathBuf>,
|
path: Option<PathBuf>,
|
||||||
|
@ -152,6 +183,8 @@ pub struct SledStore {
|
||||||
stripped_room_state: Tree,
|
stripped_room_state: Tree,
|
||||||
stripped_members: Tree,
|
stripped_members: Tree,
|
||||||
presence: Tree,
|
presence: Tree,
|
||||||
|
room_user_receipts: Tree,
|
||||||
|
room_event_receipts: Tree,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for SledStore {
|
impl std::fmt::Debug for SledStore {
|
||||||
|
@ -184,6 +217,9 @@ impl SledStore {
|
||||||
let stripped_members = db.open_tree("stripped_members")?;
|
let stripped_members = db.open_tree("stripped_members")?;
|
||||||
let stripped_room_state = db.open_tree("stripped_room_state")?;
|
let stripped_room_state = db.open_tree("stripped_room_state")?;
|
||||||
|
|
||||||
|
let room_user_receipts = db.open_tree("room_user_receipts")?;
|
||||||
|
let room_event_receipts = db.open_tree("room_event_receipts")?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
path,
|
path,
|
||||||
inner: db,
|
inner: db,
|
||||||
|
@ -202,6 +238,8 @@ impl SledStore {
|
||||||
stripped_room_info,
|
stripped_room_info,
|
||||||
stripped_members,
|
stripped_members,
|
||||||
stripped_room_state,
|
stripped_room_state,
|
||||||
|
room_user_receipts,
|
||||||
|
room_event_receipts,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -459,6 +497,58 @@ impl SledStore {
|
||||||
|
|
||||||
ret?;
|
ret?;
|
||||||
|
|
||||||
|
let ret: Result<(), TransactionError<SerializationError>> =
|
||||||
|
(&self.room_user_receipts, &self.room_event_receipts).transaction(
|
||||||
|
|(room_user_receipts, room_event_receipts)| {
|
||||||
|
for (room, content) in &changes.receipts {
|
||||||
|
for (event_id, receipts) in &content.0 {
|
||||||
|
for (receipt_type, receipts) in receipts {
|
||||||
|
for (user_id, receipt) in receipts {
|
||||||
|
// Add the receipt to the room user receipts
|
||||||
|
if let Some(old) = room_user_receipts.insert(
|
||||||
|
(room.as_str(), receipt_type.as_ref(), user_id.as_str())
|
||||||
|
.encode(),
|
||||||
|
self.serialize_event(&(event_id, receipt))
|
||||||
|
.map_err(ConflictableTransactionError::Abort)?,
|
||||||
|
)? {
|
||||||
|
// Remove the old receipt from the room event receipts
|
||||||
|
let (old_event, _): (EventId, Receipt) = self
|
||||||
|
.deserialize_event(&old)
|
||||||
|
.map_err(ConflictableTransactionError::Abort)?;
|
||||||
|
room_event_receipts.remove(
|
||||||
|
(
|
||||||
|
room.as_str(),
|
||||||
|
receipt_type.as_ref(),
|
||||||
|
old_event.as_str(),
|
||||||
|
user_id.as_str(),
|
||||||
|
)
|
||||||
|
.encode(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the receipt to the room event receipts
|
||||||
|
room_event_receipts.insert(
|
||||||
|
(
|
||||||
|
room.as_str(),
|
||||||
|
receipt_type.as_ref(),
|
||||||
|
event_id.as_str(),
|
||||||
|
user_id.as_str(),
|
||||||
|
)
|
||||||
|
.encode(),
|
||||||
|
self.serialize_event(receipt)
|
||||||
|
.map_err(ConflictableTransactionError::Abort)?,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
ret?;
|
||||||
|
|
||||||
self.inner.flush_async().await?;
|
self.inner.flush_async().await?;
|
||||||
|
|
||||||
info!("Saved changes in {:?}", now.elapsed());
|
info!("Saved changes in {:?}", now.elapsed());
|
||||||
|
@ -598,6 +688,39 @@ impl SledStore {
|
||||||
.map(|m| self.deserialize_event(&m))
|
.map(|m| self.deserialize_event(&m))
|
||||||
.transpose()?)
|
.transpose()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_user_room_receipt_event(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
user_id: &UserId,
|
||||||
|
) -> Result<Option<(EventId, Receipt)>> {
|
||||||
|
Ok(self
|
||||||
|
.room_user_receipts
|
||||||
|
.get((room_id.as_str(), receipt_type.as_ref(), user_id.as_str()).encode())?
|
||||||
|
.map(|m| self.deserialize_event(&m))
|
||||||
|
.transpose()?)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_event_room_receipt_events(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> Result<Vec<(UserId, Receipt)>> {
|
||||||
|
self.room_event_receipts
|
||||||
|
.scan_prefix((room_id.as_str(), receipt_type.as_ref(), event_id.as_str()).encode())
|
||||||
|
.map(|u| {
|
||||||
|
u.map_err(StoreError::Sled).and_then(|(key, value)| {
|
||||||
|
self.deserialize_event(&value)
|
||||||
|
.map(|receipt| {
|
||||||
|
(decode_key_value(&key, 3).unwrap().try_into().unwrap(), receipt)
|
||||||
|
})
|
||||||
|
.map_err(Into::into)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -689,6 +812,24 @@ impl StateStore for SledStore {
|
||||||
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
|
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
|
||||||
self.get_room_account_data_event(room_id, event_type).await
|
self.get_room_account_data_event(room_id, event_type).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_user_room_receipt_event(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
user_id: &UserId,
|
||||||
|
) -> Result<Option<(EventId, Receipt)>> {
|
||||||
|
self.get_user_room_receipt_event(room_id, receipt_type, user_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_event_room_receipt_events(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
receipt_type: ReceiptType,
|
||||||
|
event_id: &EventId,
|
||||||
|
) -> Result<Vec<(UserId, Receipt)>> {
|
||||||
|
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -703,7 +844,8 @@ mod test {
|
||||||
},
|
},
|
||||||
AnySyncStateEvent, EventType, Unsigned,
|
AnySyncStateEvent, EventType, Unsigned,
|
||||||
},
|
},
|
||||||
identifiers::{room_id, user_id, EventId, UserId},
|
identifiers::{event_id, room_id, user_id, EventId, UserId},
|
||||||
|
receipt::ReceiptType,
|
||||||
MilliSecondsSinceUnixEpoch, Raw,
|
MilliSecondsSinceUnixEpoch, Raw,
|
||||||
};
|
};
|
||||||
use matrix_sdk_test::async_test;
|
use matrix_sdk_test::async_test;
|
||||||
|
@ -788,4 +930,98 @@ mod test {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.is_some());
|
.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_test]
|
||||||
|
async fn test_receipts_saving() {
|
||||||
|
let store = SledStore::open().unwrap();
|
||||||
|
|
||||||
|
let room_id = room_id!("!test:localhost");
|
||||||
|
|
||||||
|
let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
|
||||||
|
let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
|
||||||
|
|
||||||
|
let first_receipt_event = serde_json::from_value(json!({
|
||||||
|
first_event_id.clone(): {
|
||||||
|
"m.read": {
|
||||||
|
user_id(): {
|
||||||
|
"ts": 1436451550453u64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let second_receipt_event = serde_json::from_value(json!({
|
||||||
|
second_event_id.clone(): {
|
||||||
|
"m.read": {
|
||||||
|
user_id(): {
|
||||||
|
"ts": 1436451551453u64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(store
|
||||||
|
.get_user_room_receipt_event(&room_id, ReceiptType::Read, &user_id())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &first_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &second_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
|
||||||
|
let mut changes = StateChanges::default();
|
||||||
|
changes.add_receipts(&room_id, first_receipt_event);
|
||||||
|
|
||||||
|
store.save_changes(&changes).await.unwrap();
|
||||||
|
assert!(store
|
||||||
|
.get_user_room_receipt_event(&room_id, ReceiptType::Read, &user_id())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_some(),);
|
||||||
|
assert_eq!(
|
||||||
|
store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &first_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.len(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &second_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
|
||||||
|
let mut changes = StateChanges::default();
|
||||||
|
changes.add_receipts(&room_id, second_receipt_event);
|
||||||
|
|
||||||
|
store.save_changes(&changes).await.unwrap();
|
||||||
|
assert!(store
|
||||||
|
.get_user_room_receipt_event(&room_id, ReceiptType::Read, &user_id())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_some());
|
||||||
|
assert!(store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &first_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_empty());
|
||||||
|
assert_eq!(
|
||||||
|
store
|
||||||
|
.get_event_room_receipt_events(&room_id, ReceiptType::Read, &second_event_id)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.len(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue