store: More work, add the ability to store member events.
parent
045ab25fb7
commit
5323e6e270
|
@ -1,30 +1,55 @@
|
||||||
|
use std::{collections::BTreeMap, convert::TryFrom};
|
||||||
|
|
||||||
|
use matrix_sdk_common::{
|
||||||
|
events::{room::member::MemberEventContent, SyncStateEvent},
|
||||||
|
identifiers::{RoomId, UserId},
|
||||||
|
};
|
||||||
use serde_json;
|
use serde_json;
|
||||||
|
|
||||||
use sled::{transaction::TransactionalTree, Config, Db, Tree};
|
use sled::{transaction::TransactionResult, Config, Db, Transactional, Tree};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Store {
|
pub struct Store {
|
||||||
inner: Db,
|
inner: Db,
|
||||||
session_tree: Tree,
|
session_tree: Tree,
|
||||||
|
member_tree: Tree,
|
||||||
}
|
}
|
||||||
|
|
||||||
use crate::Session;
|
use crate::Session;
|
||||||
|
|
||||||
pub struct TransactionalStore<'a> {
|
#[derive(Debug, Default)]
|
||||||
inner: &'a TransactionalTree,
|
pub struct StateChanges {
|
||||||
|
session: Option<Session>,
|
||||||
|
members: BTreeMap<RoomId, BTreeMap<UserId, SyncStateEvent<MemberEventContent>>>,
|
||||||
|
|
||||||
|
added_user_ids: BTreeMap<RoomId, UserId>,
|
||||||
|
invited_user_ids: BTreeMap<RoomId, UserId>,
|
||||||
|
removed_user_ids: BTreeMap<RoomId, UserId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> std::fmt::Debug for TransactionalStore<'a> {
|
impl StateChanges {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
pub fn add_event(&mut self, room_id: &RoomId, event: SyncStateEvent<MemberEventContent>) {
|
||||||
f.debug_struct("TransactionalStore").finish()
|
let user_id = UserId::try_from(event.state_key.as_str()).unwrap();
|
||||||
|
self.members
|
||||||
|
.entry(room_id.to_owned())
|
||||||
|
.or_insert_with(BTreeMap::new)
|
||||||
|
.insert(user_id, event);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_event(room_id: &RoomId, event: SyncStateEvent<MemberEventContent>) -> Self {
|
||||||
|
let mut changes = Self::default();
|
||||||
|
changes.add_event(room_id, event);
|
||||||
|
|
||||||
|
changes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> TransactionalStore<'a> {
|
impl From<Session> for StateChanges {
|
||||||
pub fn store_session(&self, session: &Session) {
|
fn from(session: Session) -> Self {
|
||||||
self.inner
|
Self {
|
||||||
.insert("session", serde_json::to_vec(session).unwrap())
|
session: Some(session),
|
||||||
.unwrap();
|
..Default::default()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,44 +57,72 @@ impl Store {
|
||||||
pub fn open() -> Self {
|
pub fn open() -> Self {
|
||||||
let db = Config::new().temporary(true).open().unwrap();
|
let db = Config::new().temporary(true).open().unwrap();
|
||||||
let session_tree = db.open_tree("session").unwrap();
|
let session_tree = db.open_tree("session").unwrap();
|
||||||
|
let member_tree = db.open_tree("members").unwrap();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
inner: db,
|
inner: db,
|
||||||
session_tree,
|
session_tree,
|
||||||
|
member_tree,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn save_changes(&self, changes: &StateChanges) {
|
||||||
|
let ret: TransactionResult<()> =
|
||||||
|
(&self.session_tree, &self.member_tree).transaction(|(session, members)| {
|
||||||
|
if let Some(s) = &changes.session {
|
||||||
|
session.insert("session", serde_json::to_vec(s).unwrap())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (room, events) in &changes.members {
|
||||||
|
for (user_id, event) in events {
|
||||||
|
members.insert(
|
||||||
|
format!("{}{}", room.as_str(), user_id.as_str()).as_str(),
|
||||||
|
serde_json::to_vec(&event).unwrap(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
ret.unwrap();
|
||||||
|
|
||||||
|
self.inner.flush_async().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_member_event(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
state_key: &UserId,
|
||||||
|
) -> Option<SyncStateEvent<MemberEventContent>> {
|
||||||
|
self.member_tree
|
||||||
|
.get(format!("{}{}", room_id.as_str(), state_key.as_str()))
|
||||||
|
.unwrap()
|
||||||
|
.map(|v| serde_json::from_slice(&v).unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_session(&self) -> Option<Session> {
|
pub fn get_session(&self) -> Option<Session> {
|
||||||
self.session_tree
|
self.session_tree
|
||||||
.get("session")
|
.get("session")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map(|s| serde_json::from_slice(&s).unwrap())
|
.map(|s| serde_json::from_slice(&s).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn transaction<F, R>(&self, callback: F) -> R
|
|
||||||
where
|
|
||||||
F: Fn(&TransactionalStore) -> R,
|
|
||||||
{
|
|
||||||
let ret = self
|
|
||||||
.session_tree
|
|
||||||
.transaction::<_, _, ()>(|t| {
|
|
||||||
let transaction = TransactionalStore { inner: t };
|
|
||||||
Ok(callback(&transaction))
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
self.inner.flush_async().await.unwrap();
|
|
||||||
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use matrix_sdk_common::identifiers::{user_id, DeviceIdBox, UserId};
|
use std::{convert::TryFrom, time::SystemTime};
|
||||||
|
|
||||||
|
use matrix_sdk_common::{
|
||||||
|
events::{
|
||||||
|
room::member::{MemberEventContent, MembershipState},
|
||||||
|
SyncStateEvent, Unsigned,
|
||||||
|
},
|
||||||
|
identifiers::{room_id, user_id, DeviceIdBox, EventId, UserId},
|
||||||
|
};
|
||||||
use matrix_sdk_test::async_test;
|
use matrix_sdk_test::async_test;
|
||||||
|
|
||||||
use super::Store;
|
use super::{StateChanges, Store};
|
||||||
use crate::Session;
|
use crate::Session;
|
||||||
|
|
||||||
fn user_id() -> UserId {
|
fn user_id() -> UserId {
|
||||||
|
@ -80,6 +133,26 @@ mod test {
|
||||||
"DEVICEID".into()
|
"DEVICEID".into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn membership_event() -> SyncStateEvent<MemberEventContent> {
|
||||||
|
let content = MemberEventContent {
|
||||||
|
avatar_url: None,
|
||||||
|
displayname: None,
|
||||||
|
is_direct: None,
|
||||||
|
third_party_invite: None,
|
||||||
|
membership: MembershipState::Join,
|
||||||
|
};
|
||||||
|
|
||||||
|
SyncStateEvent {
|
||||||
|
event_id: EventId::try_from("$h29iv0s8:example.com").unwrap(),
|
||||||
|
content,
|
||||||
|
sender: user_id(),
|
||||||
|
origin_server_ts: SystemTime::now(),
|
||||||
|
state_key: user_id().to_string(),
|
||||||
|
prev_content: None,
|
||||||
|
unsigned: Unsigned::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[async_test]
|
||||||
async fn test_session_saving() {
|
async fn test_session_saving() {
|
||||||
let session = Session {
|
let session = Session {
|
||||||
|
@ -90,14 +163,22 @@ mod test {
|
||||||
|
|
||||||
let store = Store::open();
|
let store = Store::open();
|
||||||
|
|
||||||
store
|
store.save_changes(&session.clone().into()).await;
|
||||||
.transaction(|t| {
|
|
||||||
t.store_session(&session);
|
|
||||||
()
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
let stored_session = store.get_session().unwrap();
|
let stored_session = store.get_session().unwrap();
|
||||||
|
|
||||||
assert_eq!(session, stored_session);
|
assert_eq!(session, stored_session);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_test]
|
||||||
|
async fn test_member_saving() {
|
||||||
|
let store = Store::open();
|
||||||
|
let room_id = room_id!("!test:localhost");
|
||||||
|
let user_id = user_id();
|
||||||
|
|
||||||
|
assert!(store.get_member_event(&room_id, &user_id).is_none());
|
||||||
|
let changes = StateChanges::from_event(&room_id!("!test:localhost"), membership_event());
|
||||||
|
|
||||||
|
store.save_changes(&changes).await;
|
||||||
|
assert!(store.get_member_event(&room_id, &user_id).is_some());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue