state_store: move room file when room state changes, test room_state_change method, doc edits
parent
1d9fccdc9f
commit
0f6caec93d
|
@ -1244,12 +1244,80 @@ mod test {
|
||||||
|
|
||||||
use matrix_sdk_base::JsonStore;
|
use matrix_sdk_base::JsonStore;
|
||||||
use matrix_sdk_test::{EventBuilder, EventsFile};
|
use matrix_sdk_test::{EventBuilder, EventsFile};
|
||||||
|
|
||||||
use mockito::{mock, Matcher};
|
use mockito::{mock, Matcher};
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
use std::path::Path;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_join_leave_room() {
|
||||||
|
let homeserver = Url::from_str(&mockito::server_url()).unwrap();
|
||||||
|
|
||||||
|
let room_id = RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap();
|
||||||
|
|
||||||
|
let session = Session {
|
||||||
|
access_token: "1234".to_owned(),
|
||||||
|
user_id: UserId::try_from("@example:localhost").unwrap(),
|
||||||
|
device_id: "DEVICEID".to_owned(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let _m = mock(
|
||||||
|
"GET",
|
||||||
|
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
|
||||||
|
)
|
||||||
|
.with_status(200)
|
||||||
|
.with_body_from_file("../test_data/sync.json")
|
||||||
|
.create();
|
||||||
|
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let path: &Path = dir.path();
|
||||||
|
let store = Box::new(JsonStore::open(path).unwrap());
|
||||||
|
|
||||||
|
let config = ClientConfig::default().state_store(store);
|
||||||
|
let client =
|
||||||
|
Client::new_with_config(homeserver.clone(), Some(session.clone()), config).unwrap();
|
||||||
|
|
||||||
|
let room = client.get_joined_room(&room_id).await;
|
||||||
|
assert!(room.is_none());
|
||||||
|
|
||||||
|
client.sync(SyncSettings::default()).await.unwrap();
|
||||||
|
|
||||||
|
let room = client.get_left_room(&room_id).await;
|
||||||
|
assert!(room.is_none());
|
||||||
|
|
||||||
|
let room = client.get_joined_room(&room_id).await;
|
||||||
|
assert!(room.is_some());
|
||||||
|
|
||||||
|
// test store reloads with correct room state from JsonStore
|
||||||
|
let store = Box::new(JsonStore::open(path).unwrap());
|
||||||
|
let config = ClientConfig::default().state_store(store);
|
||||||
|
let joined_client = Client::new_with_config(homeserver, Some(session), config).unwrap();
|
||||||
|
|
||||||
|
// joined room reloaded from state store
|
||||||
|
joined_client.sync(SyncSettings::default()).await.unwrap();
|
||||||
|
let room = joined_client.get_joined_room(&room_id).await;
|
||||||
|
assert!(room.is_some());
|
||||||
|
|
||||||
|
let _m = mock(
|
||||||
|
"GET",
|
||||||
|
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
|
||||||
|
)
|
||||||
|
.with_status(200)
|
||||||
|
.with_body_from_file("../test_data/leave_event_sync.json")
|
||||||
|
.create();
|
||||||
|
|
||||||
|
joined_client.sync(SyncSettings::default()).await.unwrap();
|
||||||
|
|
||||||
|
let room = joined_client.get_joined_room(&room_id).await;
|
||||||
|
assert!(room.is_none());
|
||||||
|
|
||||||
|
let room = joined_client.get_left_room(&room_id).await;
|
||||||
|
assert!(room.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn account_data() {
|
async fn account_data() {
|
||||||
let homeserver = Url::from_str(&mockito::server_url()).unwrap();
|
let homeserver = Url::from_str(&mockito::server_url()).unwrap();
|
||||||
|
@ -1890,7 +1958,7 @@ mod test {
|
||||||
.with_body_from_file("../test_data/login_response.json")
|
.with_body_from_file("../test_data/login_response.json")
|
||||||
.create();
|
.create();
|
||||||
|
|
||||||
let dir = tempfile::tempdir().unwrap();
|
let dir = tempdir().unwrap();
|
||||||
// a sync response to populate our JSON store
|
// a sync response to populate our JSON store
|
||||||
let config =
|
let config =
|
||||||
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));
|
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));
|
||||||
|
|
|
@ -310,15 +310,31 @@ impl BaseClient {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_or_create_joined_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> {
|
pub(crate) async fn get_or_create_joined_room(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
) -> Result<Arc<RwLock<Room>>> {
|
||||||
// If this used to be an invited or left room remove them from our other
|
// If this used to be an invited or left room remove them from our other
|
||||||
// hashmaps.
|
// hashmaps.
|
||||||
self.invited_rooms.write().await.remove(room_id);
|
if self.invited_rooms.write().await.remove(room_id).is_some() {
|
||||||
self.left_rooms.write().await.remove(room_id);
|
if let Some(store) = self.state_store.read().await.as_ref() {
|
||||||
|
store
|
||||||
|
.room_state_change(RoomState::Joined(room_id), RoomState::Invited(room_id))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.left_rooms.write().await.remove(room_id).is_some() {
|
||||||
|
if let Some(store) = self.state_store.read().await.as_ref() {
|
||||||
|
store
|
||||||
|
.room_state_change(RoomState::Invited(room_id), RoomState::Left(room_id))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut rooms = self.joined_rooms.write().await;
|
let mut rooms = self.joined_rooms.write().await;
|
||||||
#[allow(clippy::or_fun_call)]
|
#[allow(clippy::or_fun_call)]
|
||||||
rooms
|
Ok(rooms
|
||||||
.entry(room_id.clone())
|
.entry(room_id.clone())
|
||||||
.or_insert(Arc::new(RwLock::new(Room::new(
|
.or_insert(Arc::new(RwLock::new(Room::new(
|
||||||
room_id,
|
room_id,
|
||||||
|
@ -330,7 +346,7 @@ impl BaseClient {
|
||||||
.expect("Receiving events while not being logged in")
|
.expect("Receiving events while not being logged in")
|
||||||
.user_id,
|
.user_id,
|
||||||
))))
|
))))
|
||||||
.clone()
|
.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a joined room with the given room id.
|
/// Get a joined room with the given room id.
|
||||||
|
@ -349,14 +365,23 @@ impl BaseClient {
|
||||||
self.joined_rooms.clone()
|
self.joined_rooms.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_or_create_invited_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> {
|
pub(crate) async fn get_or_create_invited_room(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
) -> Result<Arc<RwLock<Room>>> {
|
||||||
// Remove the left rooms only here, since a join -> invite action per
|
// Remove the left rooms only here, since a join -> invite action per
|
||||||
// spec can't happen.
|
// spec can't happen.
|
||||||
self.left_rooms.write().await.remove(room_id);
|
if self.left_rooms.write().await.remove(room_id).is_some() {
|
||||||
|
if let Some(store) = self.state_store.read().await.as_ref() {
|
||||||
|
store
|
||||||
|
.room_state_change(RoomState::Invited(room_id), RoomState::Left(room_id))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut rooms = self.invited_rooms.write().await;
|
let mut rooms = self.invited_rooms.write().await;
|
||||||
#[allow(clippy::or_fun_call)]
|
#[allow(clippy::or_fun_call)]
|
||||||
rooms
|
Ok(rooms
|
||||||
.entry(room_id.clone())
|
.entry(room_id.clone())
|
||||||
.or_insert(Arc::new(RwLock::new(Room::new(
|
.or_insert(Arc::new(RwLock::new(Room::new(
|
||||||
room_id,
|
room_id,
|
||||||
|
@ -368,7 +393,7 @@ impl BaseClient {
|
||||||
.expect("Receiving events while not being logged in")
|
.expect("Receiving events while not being logged in")
|
||||||
.user_id,
|
.user_id,
|
||||||
))))
|
))))
|
||||||
.clone()
|
.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an invited room with the given room id.
|
/// Get an invited room with the given room id.
|
||||||
|
@ -387,15 +412,31 @@ impl BaseClient {
|
||||||
self.invited_rooms.clone()
|
self.invited_rooms.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_or_create_left_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> {
|
pub(crate) async fn get_or_create_left_room(
|
||||||
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
) -> Result<Arc<RwLock<Room>>> {
|
||||||
// If this used to be an invited or joined room remove them from our other
|
// If this used to be an invited or joined room remove them from our other
|
||||||
// hashmaps.
|
// hashmaps.
|
||||||
self.invited_rooms.write().await.remove(room_id);
|
if self.invited_rooms.write().await.remove(room_id).is_some() {
|
||||||
self.joined_rooms.write().await.remove(room_id);
|
if let Some(store) = self.state_store.read().await.as_ref() {
|
||||||
|
store
|
||||||
|
.room_state_change(RoomState::Left(room_id), RoomState::Invited(room_id))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.joined_rooms.write().await.remove(room_id).is_some() {
|
||||||
|
if let Some(store) = self.state_store.read().await.as_ref() {
|
||||||
|
store
|
||||||
|
.room_state_change(RoomState::Left(room_id), RoomState::Joined(room_id))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut rooms = self.left_rooms.write().await;
|
let mut rooms = self.left_rooms.write().await;
|
||||||
#[allow(clippy::or_fun_call)]
|
#[allow(clippy::or_fun_call)]
|
||||||
rooms
|
Ok(rooms
|
||||||
.entry(room_id.clone())
|
.entry(room_id.clone())
|
||||||
.or_insert(Arc::new(RwLock::new(Room::new(
|
.or_insert(Arc::new(RwLock::new(Room::new(
|
||||||
room_id,
|
room_id,
|
||||||
|
@ -407,7 +448,7 @@ impl BaseClient {
|
||||||
.expect("Receiving events while not being logged in")
|
.expect("Receiving events while not being logged in")
|
||||||
.user_id,
|
.user_id,
|
||||||
))))
|
))))
|
||||||
.clone()
|
.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an left room with the given room id.
|
/// Get an left room with the given room id.
|
||||||
|
@ -471,7 +512,7 @@ impl BaseClient {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
event: &mut EventJson<RoomEvent>,
|
event: &mut EventJson<RoomEvent>,
|
||||||
) -> (Option<EventJson<RoomEvent>>, bool) {
|
) -> Result<(Option<EventJson<RoomEvent>>, bool)> {
|
||||||
match event.deserialize() {
|
match event.deserialize() {
|
||||||
#[allow(unused_mut)]
|
#[allow(unused_mut)]
|
||||||
Ok(mut e) => {
|
Ok(mut e) => {
|
||||||
|
@ -492,7 +533,7 @@ impl BaseClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let room_lock = self.get_or_create_joined_room(&room_id).await;
|
let room_lock = self.get_or_create_joined_room(&room_id).await?;
|
||||||
let mut room = room_lock.write().await;
|
let mut room = room_lock.write().await;
|
||||||
|
|
||||||
if let RoomEvent::RoomMember(event) = &e {
|
if let RoomEvent::RoomMember(event) = &e {
|
||||||
|
@ -505,12 +546,12 @@ impl BaseClient {
|
||||||
self.invalidate_group_session(room_id).await;
|
self.invalidate_group_session(room_id).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
(decrypted_event, changed)
|
Ok((decrypted_event, changed))
|
||||||
} else {
|
} else {
|
||||||
(decrypted_event, room.receive_timeline_event(&e))
|
Ok((decrypted_event, room.receive_timeline_event(&e)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => (None, false),
|
_ => Ok((None, false)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,8 +565,12 @@ impl BaseClient {
|
||||||
/// * `room_id` - The unique id of the room the event belongs to.
|
/// * `room_id` - The unique id of the room the event belongs to.
|
||||||
///
|
///
|
||||||
/// * `event` - The event that should be handled by the client.
|
/// * `event` - The event that should be handled by the client.
|
||||||
pub async fn receive_joined_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool {
|
pub async fn receive_joined_state_event(
|
||||||
let room_lock = self.get_or_create_joined_room(room_id).await;
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
event: &StateEvent,
|
||||||
|
) -> Result<bool> {
|
||||||
|
let room_lock = self.get_or_create_joined_room(room_id).await?;
|
||||||
let mut room = room_lock.write().await;
|
let mut room = room_lock.write().await;
|
||||||
|
|
||||||
if let StateEvent::RoomMember(e) = event {
|
if let StateEvent::RoomMember(e) = event {
|
||||||
|
@ -538,9 +583,9 @@ impl BaseClient {
|
||||||
self.invalidate_group_session(room_id).await;
|
self.invalidate_group_session(room_id).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
changed
|
Ok(changed)
|
||||||
} else {
|
} else {
|
||||||
room.receive_state_event(event)
|
Ok(room.receive_state_event(event))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -558,10 +603,10 @@ impl BaseClient {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
event: &AnyStrippedStateEvent,
|
event: &AnyStrippedStateEvent,
|
||||||
) -> bool {
|
) -> Result<bool> {
|
||||||
let room_lock = self.get_or_create_invited_room(room_id).await;
|
let room_lock = self.get_or_create_invited_room(room_id).await?;
|
||||||
let mut room = room_lock.write().await;
|
let mut room = room_lock.write().await;
|
||||||
room.receive_stripped_state_event(event)
|
Ok(room.receive_stripped_state_event(event))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive a timeline event for a room the user has left and update the client state.
|
/// Receive a timeline event for a room the user has left and update the client state.
|
||||||
|
@ -578,14 +623,14 @@ impl BaseClient {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
event: &EventJson<RoomEvent>,
|
event: &EventJson<RoomEvent>,
|
||||||
) -> bool {
|
) -> Result<bool> {
|
||||||
match event.deserialize() {
|
match event.deserialize() {
|
||||||
Ok(e) => {
|
Ok(e) => {
|
||||||
let room_lock = self.get_or_create_left_room(room_id).await;
|
let room_lock = self.get_or_create_left_room(room_id).await?;
|
||||||
let mut room = room_lock.write().await;
|
let mut room = room_lock.write().await;
|
||||||
room.receive_timeline_event(&e)
|
Ok(room.receive_timeline_event(&e))
|
||||||
}
|
}
|
||||||
_ => false,
|
_ => Ok(false),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -599,10 +644,14 @@ impl BaseClient {
|
||||||
/// * `room_id` - The unique id of the room the event belongs to.
|
/// * `room_id` - The unique id of the room the event belongs to.
|
||||||
///
|
///
|
||||||
/// * `event` - The event that should be handled by the client.
|
/// * `event` - The event that should be handled by the client.
|
||||||
pub async fn receive_left_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool {
|
pub async fn receive_left_state_event(
|
||||||
let room_lock = self.get_or_create_left_room(room_id).await;
|
&self,
|
||||||
|
room_id: &RoomId,
|
||||||
|
event: &StateEvent,
|
||||||
|
) -> Result<bool> {
|
||||||
|
let room_lock = self.get_or_create_left_room(room_id).await?;
|
||||||
let mut room = room_lock.write().await;
|
let mut room = room_lock.write().await;
|
||||||
room.receive_state_event(event)
|
Ok(room.receive_state_event(event))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive a presence event from a sync response and updates the client state.
|
/// Receive a presence event from a sync response and updates the client state.
|
||||||
|
@ -700,9 +749,6 @@ impl BaseClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO do we want to move the rooms to the appropriate HashMaps when the corresponding
|
|
||||||
// event comes in e.g. move a joined room to a left room when leave event comes?
|
|
||||||
|
|
||||||
// when events change state, updated_* signals to StateStore to update database
|
// when events change state, updated_* signals to StateStore to update database
|
||||||
self.iter_joined_rooms(response).await?;
|
self.iter_joined_rooms(response).await?;
|
||||||
self.iter_invited_rooms(&response).await?;
|
self.iter_invited_rooms(&response).await?;
|
||||||
|
@ -730,7 +776,7 @@ impl BaseClient {
|
||||||
let matrix_room = {
|
let matrix_room = {
|
||||||
for event in &joined_room.state.events {
|
for event in &joined_room.state.events {
|
||||||
if let Ok(e) = event.deserialize() {
|
if let Ok(e) = event.deserialize() {
|
||||||
if self.receive_joined_state_event(&room_id, &e).await {
|
if self.receive_joined_state_event(&room_id, &e).await? {
|
||||||
updated = true;
|
updated = true;
|
||||||
}
|
}
|
||||||
self.emit_state_event(&room_id, &e, RoomStateType::Joined)
|
self.emit_state_event(&room_id, &e, RoomStateType::Joined)
|
||||||
|
@ -738,7 +784,7 @@ impl BaseClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.get_or_create_joined_room(&room_id).await.clone()
|
self.get_or_create_joined_room(&room_id).await?.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
|
@ -771,7 +817,7 @@ impl BaseClient {
|
||||||
let decrypted_event = {
|
let decrypted_event = {
|
||||||
let (decrypt_ev, timeline_update) = self
|
let (decrypt_ev, timeline_update) = self
|
||||||
.receive_joined_timeline_event(room_id, &mut event)
|
.receive_joined_timeline_event(room_id, &mut event)
|
||||||
.await;
|
.await?;
|
||||||
if timeline_update {
|
if timeline_update {
|
||||||
updated = true;
|
updated = true;
|
||||||
};
|
};
|
||||||
|
@ -852,13 +898,13 @@ impl BaseClient {
|
||||||
let matrix_room = {
|
let matrix_room = {
|
||||||
for event in &left_room.state.events {
|
for event in &left_room.state.events {
|
||||||
if let Ok(e) = event.deserialize() {
|
if let Ok(e) = event.deserialize() {
|
||||||
if self.receive_left_state_event(&room_id, &e).await {
|
if self.receive_left_state_event(&room_id, &e).await? {
|
||||||
updated = true;
|
updated = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.get_or_create_left_room(&room_id).await.clone()
|
self.get_or_create_left_room(&room_id).await?.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
for event in &mut left_room.state.events {
|
for event in &mut left_room.state.events {
|
||||||
|
@ -869,7 +915,7 @@ impl BaseClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
for event in &mut left_room.timeline.events {
|
for event in &mut left_room.timeline.events {
|
||||||
if self.receive_left_timeline_event(room_id, &event).await {
|
if self.receive_left_timeline_event(room_id, &event).await? {
|
||||||
updated = true;
|
updated = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -899,13 +945,13 @@ impl BaseClient {
|
||||||
let matrix_room = {
|
let matrix_room = {
|
||||||
for event in &invited_room.invite_state.events {
|
for event in &invited_room.invite_state.events {
|
||||||
if let Ok(e) = event.deserialize() {
|
if let Ok(e) = event.deserialize() {
|
||||||
if self.receive_invite_state_event(&room_id, &e).await {
|
if self.receive_invite_state_event(&room_id, &e).await? {
|
||||||
updated = true;
|
updated = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.get_or_create_invited_room(&room_id).await.clone()
|
self.get_or_create_invited_room(&room_id).await?.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
for event in &invited_room.invite_state.events {
|
for event in &invited_room.invite_state.events {
|
||||||
|
|
|
@ -32,11 +32,16 @@ pub enum Error {
|
||||||
#[error("the queried endpoint requires authentication but was called before logging in")]
|
#[error("the queried endpoint requires authentication but was called before logging in")]
|
||||||
AuthenticationRequired,
|
AuthenticationRequired,
|
||||||
|
|
||||||
/// An error de/serializing type for the `StateStore`
|
/// A generic error returned when the state store fails not due to
|
||||||
|
/// IO or (de)serialization.
|
||||||
|
#[error("state store: {0}")]
|
||||||
|
StateStore(String),
|
||||||
|
|
||||||
|
/// An error when (de)serializing JSON.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
SerdeJson(#[from] JsonError),
|
SerdeJson(#[from] JsonError),
|
||||||
|
|
||||||
/// An error de/serializing type for the `StateStore`
|
/// An error representing IO errors.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
IoError(#[from] IoError),
|
IoError(#[from] IoError),
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ use std::sync::{
|
||||||
Arc,
|
Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use matrix_sdk_common::identifiers::RoomId;
|
||||||
use matrix_sdk_common::locks::RwLock;
|
use matrix_sdk_common::locks::RwLock;
|
||||||
use tokio::fs as async_fs;
|
use tokio::fs as async_fs;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
@ -166,6 +167,82 @@ impl StateStore for JsonStore {
|
||||||
.await?;
|
.await?;
|
||||||
file.write_all(json.as_bytes()).await.map_err(Error::from)
|
file.write_all(json.as_bytes()).await.map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn room_state_change(
|
||||||
|
&self,
|
||||||
|
current_room: RoomState<&RoomId>,
|
||||||
|
previous_room: RoomState<&RoomId>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let (room_id, room_state) = match &previous_room {
|
||||||
|
RoomState::Joined(id) => (id, "joined"),
|
||||||
|
RoomState::Invited(id) => (id, "invited"),
|
||||||
|
RoomState::Left(id) => (id, "left"),
|
||||||
|
};
|
||||||
|
|
||||||
|
if !self.user_path_set.load(Ordering::SeqCst) {
|
||||||
|
return Err(Error::StateStore("path for JsonStore not set".into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut from = self.path.read().await.clone();
|
||||||
|
from.push("rooms");
|
||||||
|
from.push(&format!("{}/{}.json", room_state, room_id));
|
||||||
|
|
||||||
|
if !from.exists() {
|
||||||
|
return Err(Error::StateStore(format!(
|
||||||
|
"file {:?} not found for JsonStore",
|
||||||
|
from
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
match current_room {
|
||||||
|
RoomState::Joined(room_id) => {
|
||||||
|
let mut to = self.path.read().await.clone();
|
||||||
|
to.push("rooms");
|
||||||
|
to.push("joined");
|
||||||
|
if !to.exists() {
|
||||||
|
async_fs::create_dir_all(&to).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
to.push(&format!("{}.json", room_id));
|
||||||
|
if !to.exists() {
|
||||||
|
async_fs::File::create(&to).await?;
|
||||||
|
}
|
||||||
|
// now that we know the `to` file exists move `from` to new correct state folder
|
||||||
|
tokio::fs::rename(from, to).await?;
|
||||||
|
}
|
||||||
|
RoomState::Invited(room_id) => {
|
||||||
|
let mut to = self.path.read().await.clone();
|
||||||
|
to.push("rooms");
|
||||||
|
to.push("invited");
|
||||||
|
if !to.exists() {
|
||||||
|
async_fs::create_dir_all(&to).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
to.push(&format!("{}.json", room_id));
|
||||||
|
if !to.exists() {
|
||||||
|
async_fs::File::create(&to).await?;
|
||||||
|
}
|
||||||
|
// now that we know the `to` file exists move `from` to new correct state folder
|
||||||
|
tokio::fs::rename(from, to).await?;
|
||||||
|
}
|
||||||
|
RoomState::Left(room_id) => {
|
||||||
|
let mut to = self.path.read().await.clone();
|
||||||
|
to.push("rooms");
|
||||||
|
to.push("left");
|
||||||
|
if !to.exists() {
|
||||||
|
async_fs::create_dir_all(&to).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
to.push(&format!("{}.json", room_id));
|
||||||
|
if !to.exists() {
|
||||||
|
async_fs::File::create(&to).await?;
|
||||||
|
}
|
||||||
|
// now that we know the `to` file exists move `from` to new correct state folder
|
||||||
|
tokio::fs::rename(from, to).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -277,6 +354,54 @@ mod test {
|
||||||
assert_eq!(invited.get(&id), Some(&Room::new(&id, &user)));
|
assert_eq!(invited.get(&id), Some(&Room::new(&id, &user)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_store_load_join_leave_room_state() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let path: &Path = dir.path();
|
||||||
|
let store = JsonStore::open(path).unwrap();
|
||||||
|
|
||||||
|
let id = RoomId::try_from("!roomid:example.com").unwrap();
|
||||||
|
let user = UserId::try_from("@example:example.com").unwrap();
|
||||||
|
|
||||||
|
let room = Room::new(&id, &user);
|
||||||
|
store
|
||||||
|
.store_room_state(RoomState::Joined(&room))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(store
|
||||||
|
.room_state_change(RoomState::Left(&id), RoomState::Joined(&id))
|
||||||
|
.await
|
||||||
|
.is_ok());
|
||||||
|
let AllRooms { joined, left, .. } = store.load_all_rooms().await.unwrap();
|
||||||
|
assert_eq!(left.get(&id), Some(&Room::new(&id, &user)));
|
||||||
|
assert!(joined.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_store_load_invite_join_room_state() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let path: &Path = dir.path();
|
||||||
|
let store = JsonStore::open(path).unwrap();
|
||||||
|
|
||||||
|
let id = RoomId::try_from("!roomid:example.com").unwrap();
|
||||||
|
let user = UserId::try_from("@example:example.com").unwrap();
|
||||||
|
|
||||||
|
let room = Room::new(&id, &user);
|
||||||
|
store
|
||||||
|
.store_room_state(RoomState::Invited(&room))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(store
|
||||||
|
.room_state_change(RoomState::Joined(&id), RoomState::Invited(&id))
|
||||||
|
.await
|
||||||
|
.is_ok());
|
||||||
|
let AllRooms {
|
||||||
|
invited, joined, ..
|
||||||
|
} = store.load_all_rooms().await.unwrap();
|
||||||
|
assert_eq!(joined.get(&id), Some(&Room::new(&id, &user)));
|
||||||
|
assert!(invited.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_client_sync_store() {
|
async fn test_client_sync_store() {
|
||||||
let dir = tempdir().unwrap();
|
let dir = tempdir().unwrap();
|
||||||
|
|
|
@ -50,6 +50,10 @@ impl PartialEq for ClientState {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClientState {
|
impl ClientState {
|
||||||
|
/// Create a JSON serialize-able `ClientState`.
|
||||||
|
///
|
||||||
|
/// This enables non sensitive information to be saved by `JsonStore`.
|
||||||
|
#[allow(clippy::eval_order_dependence)]
|
||||||
pub async fn from_base_client(client: &BaseClient) -> ClientState {
|
pub async fn from_base_client(client: &BaseClient) -> ClientState {
|
||||||
let BaseClient {
|
let BaseClient {
|
||||||
sync_token,
|
sync_token,
|
||||||
|
@ -94,6 +98,15 @@ pub trait StateStore: Send + Sync {
|
||||||
async fn store_client_state(&self, _: ClientState) -> Result<()>;
|
async fn store_client_state(&self, _: ClientState) -> Result<()>;
|
||||||
/// Save the state a single `Room`.
|
/// Save the state a single `Room`.
|
||||||
async fn store_room_state(&self, _: RoomState<&Room>) -> Result<()>;
|
async fn store_room_state(&self, _: RoomState<&Room>) -> Result<()>;
|
||||||
|
/// Signals to the `StateStore` a room has changed state.
|
||||||
|
///
|
||||||
|
/// This enables implementing types to update the database when `RoomState` changes.
|
||||||
|
/// A `RoomState` change is when a user joins, is invited, or leaves a room.
|
||||||
|
async fn room_state_change(
|
||||||
|
&self,
|
||||||
|
_current: RoomState<&RoomId>,
|
||||||
|
_previous: RoomState<&RoomId>,
|
||||||
|
) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
{
|
||||||
|
"account_data": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"to_device": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"device_lists": {
|
||||||
|
"changed": [],
|
||||||
|
"left": []
|
||||||
|
},
|
||||||
|
"presence": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"rooms": {
|
||||||
|
"join": {},
|
||||||
|
"invite": {},
|
||||||
|
"leave": {
|
||||||
|
"!SVkFJHzfwvuaIEawgC:localhost": {
|
||||||
|
"timeline": {
|
||||||
|
"events": [
|
||||||
|
{
|
||||||
|
"content": {
|
||||||
|
"membership": "leave"
|
||||||
|
},
|
||||||
|
"origin_server_ts": 1589578095276,
|
||||||
|
"sender": "@example:localhost",
|
||||||
|
"state_key": "@example:localhost",
|
||||||
|
"type": "m.room.member",
|
||||||
|
"unsigned": {
|
||||||
|
"replaces_state": "$blahblah",
|
||||||
|
"prev_content": {
|
||||||
|
"avatar_url": null,
|
||||||
|
"displayname": "me",
|
||||||
|
"membership": "invite"
|
||||||
|
},
|
||||||
|
"prev_sender": "@2example:localhost",
|
||||||
|
"age": 1757
|
||||||
|
},
|
||||||
|
"event_id": "$lQQ116Y-XqcjpSUGpuz36rNntUvOSpTjuaIvmtQ2AwA"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"prev_batch": "tokenTOKEN",
|
||||||
|
"limited": false
|
||||||
|
},
|
||||||
|
"state": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"account_data": {
|
||||||
|
"events": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"groups": {
|
||||||
|
"join": {},
|
||||||
|
"invite": {},
|
||||||
|
"leave": {}
|
||||||
|
},
|
||||||
|
"device_one_time_keys_count": {
|
||||||
|
"signed_curve25519": 50
|
||||||
|
},
|
||||||
|
"next_batch": "s1380317562_757269739_1655566_503953763_334052043_1209862_55290918_65705002_101146"
|
||||||
|
}
|
Loading…
Reference in New Issue