Merge branch 'swap-roomstate'

master
Damir Jelić 2020-05-22 10:09:47 +02:00
commit cffa2990b6
6 changed files with 304 additions and 48 deletions

View File

@ -1244,12 +1244,80 @@ mod test {
use matrix_sdk_base::JsonStore; use matrix_sdk_base::JsonStore;
use matrix_sdk_test::{EventBuilder, EventsFile}; use matrix_sdk_test::{EventBuilder, EventsFile};
use mockito::{mock, Matcher}; use mockito::{mock, Matcher};
use tempfile::tempdir;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
#[tokio::test]
async fn test_join_leave_room() {
let homeserver = Url::from_str(&mockito::server_url()).unwrap();
let room_id = RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap();
let session = Session {
access_token: "1234".to_owned(),
user_id: UserId::try_from("@example:localhost").unwrap(),
device_id: "DEVICEID".to_owned(),
};
let _m = mock(
"GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
)
.with_status(200)
.with_body_from_file("../test_data/sync.json")
.create();
let dir = tempdir().unwrap();
let path: &Path = dir.path();
let store = Box::new(JsonStore::open(path).unwrap());
let config = ClientConfig::default().state_store(store);
let client =
Client::new_with_config(homeserver.clone(), Some(session.clone()), config).unwrap();
let room = client.get_joined_room(&room_id).await;
assert!(room.is_none());
client.sync(SyncSettings::default()).await.unwrap();
let room = client.get_left_room(&room_id).await;
assert!(room.is_none());
let room = client.get_joined_room(&room_id).await;
assert!(room.is_some());
// test store reloads with correct room state from JsonStore
let store = Box::new(JsonStore::open(path).unwrap());
let config = ClientConfig::default().state_store(store);
let joined_client = Client::new_with_config(homeserver, Some(session), config).unwrap();
// joined room reloaded from state store
joined_client.sync(SyncSettings::default()).await.unwrap();
let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_some());
let _m = mock(
"GET",
Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()),
)
.with_status(200)
.with_body_from_file("../test_data/leave_event_sync.json")
.create();
joined_client.sync(SyncSettings::default()).await.unwrap();
let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_none());
let room = joined_client.get_left_room(&room_id).await;
assert!(room.is_some());
}
#[tokio::test] #[tokio::test]
async fn account_data() { async fn account_data() {
let homeserver = Url::from_str(&mockito::server_url()).unwrap(); let homeserver = Url::from_str(&mockito::server_url()).unwrap();
@ -1890,7 +1958,7 @@ mod test {
.with_body_from_file("../test_data/login_response.json") .with_body_from_file("../test_data/login_response.json")
.create(); .create();
let dir = tempfile::tempdir().unwrap(); let dir = tempdir().unwrap();
// a sync response to populate our JSON store // a sync response to populate our JSON store
let config = let config =
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));

View File

@ -310,15 +310,27 @@ impl BaseClient {
Ok(()) Ok(())
} }
pub(crate) async fn get_or_create_joined_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> { pub(crate) async fn get_or_create_joined_room(
&self,
room_id: &RoomId,
) -> Result<Arc<RwLock<Room>>> {
// If this used to be an invited or left room remove them from our other // If this used to be an invited or left room remove them from our other
// hashmaps. // hashmaps.
self.invited_rooms.write().await.remove(room_id); if self.invited_rooms.write().await.remove(room_id).is_some() {
self.left_rooms.write().await.remove(room_id); if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Invited(room_id)).await?;
}
}
if self.left_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Left(room_id)).await?;
}
}
let mut rooms = self.joined_rooms.write().await; let mut rooms = self.joined_rooms.write().await;
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
rooms Ok(rooms
.entry(room_id.clone()) .entry(room_id.clone())
.or_insert(Arc::new(RwLock::new(Room::new( .or_insert(Arc::new(RwLock::new(Room::new(
room_id, room_id,
@ -330,7 +342,7 @@ impl BaseClient {
.expect("Receiving events while not being logged in") .expect("Receiving events while not being logged in")
.user_id, .user_id,
)))) ))))
.clone() .clone())
} }
/// Get a joined room with the given room id. /// Get a joined room with the given room id.
@ -349,14 +361,21 @@ impl BaseClient {
self.joined_rooms.clone() self.joined_rooms.clone()
} }
pub(crate) async fn get_or_create_invited_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> { pub(crate) async fn get_or_create_invited_room(
&self,
room_id: &RoomId,
) -> Result<Arc<RwLock<Room>>> {
// Remove the left rooms only here, since a join -> invite action per // Remove the left rooms only here, since a join -> invite action per
// spec can't happen. // spec can't happen.
self.left_rooms.write().await.remove(room_id); if self.left_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Left(room_id)).await?;
}
}
let mut rooms = self.invited_rooms.write().await; let mut rooms = self.invited_rooms.write().await;
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
rooms Ok(rooms
.entry(room_id.clone()) .entry(room_id.clone())
.or_insert(Arc::new(RwLock::new(Room::new( .or_insert(Arc::new(RwLock::new(Room::new(
room_id, room_id,
@ -368,7 +387,7 @@ impl BaseClient {
.expect("Receiving events while not being logged in") .expect("Receiving events while not being logged in")
.user_id, .user_id,
)))) ))))
.clone() .clone())
} }
/// Get an invited room with the given room id. /// Get an invited room with the given room id.
@ -387,15 +406,27 @@ impl BaseClient {
self.invited_rooms.clone() self.invited_rooms.clone()
} }
pub(crate) async fn get_or_create_left_room(&self, room_id: &RoomId) -> Arc<RwLock<Room>> { pub(crate) async fn get_or_create_left_room(
&self,
room_id: &RoomId,
) -> Result<Arc<RwLock<Room>>> {
// If this used to be an invited or joined room remove them from our other // If this used to be an invited or joined room remove them from our other
// hashmaps. // hashmaps.
self.invited_rooms.write().await.remove(room_id); if self.invited_rooms.write().await.remove(room_id).is_some() {
self.joined_rooms.write().await.remove(room_id); if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Invited(room_id)).await?;
}
}
if self.joined_rooms.write().await.remove(room_id).is_some() {
if let Some(store) = self.state_store.read().await.as_ref() {
store.delete_room_state(RoomState::Joined(room_id)).await?;
}
}
let mut rooms = self.left_rooms.write().await; let mut rooms = self.left_rooms.write().await;
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
rooms Ok(rooms
.entry(room_id.clone()) .entry(room_id.clone())
.or_insert(Arc::new(RwLock::new(Room::new( .or_insert(Arc::new(RwLock::new(Room::new(
room_id, room_id,
@ -407,7 +438,7 @@ impl BaseClient {
.expect("Receiving events while not being logged in") .expect("Receiving events while not being logged in")
.user_id, .user_id,
)))) ))))
.clone() .clone())
} }
/// Get an left room with the given room id. /// Get an left room with the given room id.
@ -471,7 +502,7 @@ impl BaseClient {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event: &mut EventJson<RoomEvent>, event: &mut EventJson<RoomEvent>,
) -> (Option<EventJson<RoomEvent>>, bool) { ) -> Result<(Option<EventJson<RoomEvent>>, bool)> {
match event.deserialize() { match event.deserialize() {
#[allow(unused_mut)] #[allow(unused_mut)]
Ok(mut e) => { Ok(mut e) => {
@ -492,7 +523,7 @@ impl BaseClient {
} }
} }
let room_lock = self.get_or_create_joined_room(&room_id).await; let room_lock = self.get_or_create_joined_room(&room_id).await?;
let mut room = room_lock.write().await; let mut room = room_lock.write().await;
if let RoomEvent::RoomMember(event) = &e { if let RoomEvent::RoomMember(event) = &e {
@ -505,12 +536,12 @@ impl BaseClient {
self.invalidate_group_session(room_id).await; self.invalidate_group_session(room_id).await;
} }
(decrypted_event, changed) Ok((decrypted_event, changed))
} else { } else {
(decrypted_event, room.receive_timeline_event(&e)) Ok((decrypted_event, room.receive_timeline_event(&e)))
} }
} }
_ => (None, false), _ => Ok((None, false)),
} }
} }
@ -524,8 +555,12 @@ impl BaseClient {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The event that should be handled by the client. /// * `event` - The event that should be handled by the client.
pub async fn receive_joined_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool { pub async fn receive_joined_state_event(
let room_lock = self.get_or_create_joined_room(room_id).await; &self,
room_id: &RoomId,
event: &StateEvent,
) -> Result<bool> {
let room_lock = self.get_or_create_joined_room(room_id).await?;
let mut room = room_lock.write().await; let mut room = room_lock.write().await;
if let StateEvent::RoomMember(e) = event { if let StateEvent::RoomMember(e) = event {
@ -538,9 +573,9 @@ impl BaseClient {
self.invalidate_group_session(room_id).await; self.invalidate_group_session(room_id).await;
} }
changed Ok(changed)
} else { } else {
room.receive_state_event(event) Ok(room.receive_state_event(event))
} }
} }
@ -558,10 +593,10 @@ impl BaseClient {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event: &AnyStrippedStateEvent, event: &AnyStrippedStateEvent,
) -> bool { ) -> Result<bool> {
let room_lock = self.get_or_create_invited_room(room_id).await; let room_lock = self.get_or_create_invited_room(room_id).await?;
let mut room = room_lock.write().await; let mut room = room_lock.write().await;
room.receive_stripped_state_event(event) Ok(room.receive_stripped_state_event(event))
} }
/// Receive a timeline event for a room the user has left and update the client state. /// Receive a timeline event for a room the user has left and update the client state.
@ -578,14 +613,14 @@ impl BaseClient {
&self, &self,
room_id: &RoomId, room_id: &RoomId,
event: &EventJson<RoomEvent>, event: &EventJson<RoomEvent>,
) -> bool { ) -> Result<bool> {
match event.deserialize() { match event.deserialize() {
Ok(e) => { Ok(e) => {
let room_lock = self.get_or_create_left_room(room_id).await; let room_lock = self.get_or_create_left_room(room_id).await?;
let mut room = room_lock.write().await; let mut room = room_lock.write().await;
room.receive_timeline_event(&e) Ok(room.receive_timeline_event(&e))
} }
_ => false, _ => Ok(false),
} }
} }
@ -599,10 +634,14 @@ impl BaseClient {
/// * `room_id` - The unique id of the room the event belongs to. /// * `room_id` - The unique id of the room the event belongs to.
/// ///
/// * `event` - The event that should be handled by the client. /// * `event` - The event that should be handled by the client.
pub async fn receive_left_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool { pub async fn receive_left_state_event(
let room_lock = self.get_or_create_left_room(room_id).await; &self,
room_id: &RoomId,
event: &StateEvent,
) -> Result<bool> {
let room_lock = self.get_or_create_left_room(room_id).await?;
let mut room = room_lock.write().await; let mut room = room_lock.write().await;
room.receive_state_event(event) Ok(room.receive_state_event(event))
} }
/// Receive a presence event from a sync response and updates the client state. /// Receive a presence event from a sync response and updates the client state.
@ -700,9 +739,6 @@ impl BaseClient {
} }
} }
// TODO do we want to move the rooms to the appropriate HashMaps when the corresponding
// event comes in e.g. move a joined room to a left room when leave event comes?
// when events change state, updated_* signals to StateStore to update database // when events change state, updated_* signals to StateStore to update database
self.iter_joined_rooms(response).await?; self.iter_joined_rooms(response).await?;
self.iter_invited_rooms(&response).await?; self.iter_invited_rooms(&response).await?;
@ -730,7 +766,7 @@ impl BaseClient {
let matrix_room = { let matrix_room = {
for event in &joined_room.state.events { for event in &joined_room.state.events {
if let Ok(e) = event.deserialize() { if let Ok(e) = event.deserialize() {
if self.receive_joined_state_event(&room_id, &e).await { if self.receive_joined_state_event(&room_id, &e).await? {
updated = true; updated = true;
} }
self.emit_state_event(&room_id, &e, RoomStateType::Joined) self.emit_state_event(&room_id, &e, RoomStateType::Joined)
@ -738,7 +774,7 @@ impl BaseClient {
} }
} }
self.get_or_create_joined_room(&room_id).await.clone() self.get_or_create_joined_room(&room_id).await?.clone()
}; };
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
@ -771,7 +807,7 @@ impl BaseClient {
let decrypted_event = { let decrypted_event = {
let (decrypt_ev, timeline_update) = self let (decrypt_ev, timeline_update) = self
.receive_joined_timeline_event(room_id, &mut event) .receive_joined_timeline_event(room_id, &mut event)
.await; .await?;
if timeline_update { if timeline_update {
updated = true; updated = true;
}; };
@ -852,13 +888,13 @@ impl BaseClient {
let matrix_room = { let matrix_room = {
for event in &left_room.state.events { for event in &left_room.state.events {
if let Ok(e) = event.deserialize() { if let Ok(e) = event.deserialize() {
if self.receive_left_state_event(&room_id, &e).await { if self.receive_left_state_event(&room_id, &e).await? {
updated = true; updated = true;
} }
} }
} }
self.get_or_create_left_room(&room_id).await.clone() self.get_or_create_left_room(&room_id).await?.clone()
}; };
for event in &mut left_room.state.events { for event in &mut left_room.state.events {
@ -869,7 +905,7 @@ impl BaseClient {
} }
for event in &mut left_room.timeline.events { for event in &mut left_room.timeline.events {
if self.receive_left_timeline_event(room_id, &event).await { if self.receive_left_timeline_event(room_id, &event).await? {
updated = true; updated = true;
}; };
@ -899,13 +935,13 @@ impl BaseClient {
let matrix_room = { let matrix_room = {
for event in &invited_room.invite_state.events { for event in &invited_room.invite_state.events {
if let Ok(e) = event.deserialize() { if let Ok(e) = event.deserialize() {
if self.receive_invite_state_event(&room_id, &e).await { if self.receive_invite_state_event(&room_id, &e).await? {
updated = true; updated = true;
} }
} }
} }
self.get_or_create_invited_room(&room_id).await.clone() self.get_or_create_invited_room(&room_id).await?.clone()
}; };
for event in &invited_room.invite_state.events { for event in &invited_room.invite_state.events {

View File

@ -32,11 +32,16 @@ pub enum Error {
#[error("the queried endpoint requires authentication but was called before logging in")] #[error("the queried endpoint requires authentication but was called before logging in")]
AuthenticationRequired, AuthenticationRequired,
/// An error de/serializing type for the `StateStore` /// A generic error returned when the state store fails not due to
/// IO or (de)serialization.
#[error("state store: {0}")]
StateStore(String),
/// An error when (de)serializing JSON.
#[error(transparent)] #[error(transparent)]
SerdeJson(#[from] JsonError), SerdeJson(#[from] JsonError),
/// An error de/serializing type for the `StateStore` /// An error representing IO errors.
#[error(transparent)] #[error(transparent)]
IoError(#[from] IoError), IoError(#[from] IoError),

View File

@ -7,6 +7,7 @@ use std::sync::{
Arc, Arc,
}; };
use matrix_sdk_common::identifiers::RoomId;
use matrix_sdk_common::locks::RwLock; use matrix_sdk_common::locks::RwLock;
use tokio::fs as async_fs; use tokio::fs as async_fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@ -166,6 +167,28 @@ impl StateStore for JsonStore {
.await?; .await?;
file.write_all(json.as_bytes()).await.map_err(Error::from) file.write_all(json.as_bytes()).await.map_err(Error::from)
} }
async fn delete_room_state(&self, room: RoomState<&RoomId>) -> Result<()> {
let (room_id, room_state) = match &room {
RoomState::Joined(id) => (id, "joined"),
RoomState::Invited(id) => (id, "invited"),
RoomState::Left(id) => (id, "left"),
};
if !self.user_path_set.load(Ordering::SeqCst) {
return Err(Error::StateStore("path for JsonStore not set".into()));
}
let mut to_del = self.path.read().await.clone();
to_del.push("rooms");
to_del.push(&format!("{}/{}.json", room_state, room_id));
if !to_del.exists() {
return Err(Error::StateStore(format!("file {:?} not found", to_del)));
}
tokio::fs::remove_file(to_del).await.map_err(Error::from)
}
} }
#[cfg(test)] #[cfg(test)]
@ -277,6 +300,53 @@ mod test {
assert_eq!(invited.get(&id), Some(&Room::new(&id, &user))); assert_eq!(invited.get(&id), Some(&Room::new(&id, &user)));
} }
#[tokio::test]
async fn test_store_load_join_leave_room_state() {
let dir = tempdir().unwrap();
let path: &Path = dir.path();
let store = JsonStore::open(path).unwrap();
let id = RoomId::try_from("!roomid:example.com").unwrap();
let user = UserId::try_from("@example:example.com").unwrap();
let room = Room::new(&id, &user);
store
.store_room_state(RoomState::Joined(&room))
.await
.unwrap();
assert!(store
.delete_room_state(RoomState::Joined(&id))
.await
.is_ok());
let AllRooms { joined, .. } = store.load_all_rooms().await.unwrap();
// test that we have removed the correct room
assert!(joined.is_empty());
}
#[tokio::test]
async fn test_store_load_invite_join_room_state() {
let dir = tempdir().unwrap();
let path: &Path = dir.path();
let store = JsonStore::open(path).unwrap();
let id = RoomId::try_from("!roomid:example.com").unwrap();
let user = UserId::try_from("@example:example.com").unwrap();
let room = Room::new(&id, &user);
store
.store_room_state(RoomState::Invited(&room))
.await
.unwrap();
assert!(store
.delete_room_state(RoomState::Invited(&id))
.await
.is_ok());
let AllRooms { invited, .. } = store.load_all_rooms().await.unwrap();
// test that we have removed the correct room
assert!(invited.is_empty());
}
#[tokio::test] #[tokio::test]
async fn test_client_sync_store() { async fn test_client_sync_store() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();

View File

@ -50,6 +50,11 @@ impl PartialEq for ClientState {
} }
impl ClientState { impl ClientState {
/// Create a JSON serialize-able `ClientState`.
///
/// This enables non sensitive information to be saved by `JsonStore`.
#[allow(clippy::eval_order_dependence)]
// TODO is this ok ^^^?? https://github.com/rust-lang/rust-clippy/issues/4637
pub async fn from_base_client(client: &BaseClient) -> ClientState { pub async fn from_base_client(client: &BaseClient) -> ClientState {
let BaseClient { let BaseClient {
sync_token, sync_token,
@ -86,14 +91,22 @@ pub trait StateStore: Send + Sync {
/// An `Option::None` should be returned only if the `StateStore` tries to /// An `Option::None` should be returned only if the `StateStore` tries to
/// load but no state has been stored. /// load but no state has been stored.
async fn load_client_state(&self, _: &Session) -> Result<Option<ClientState>>; async fn load_client_state(&self, _: &Session) -> Result<Option<ClientState>>;
/// Load the state of all `Room`s. /// Load the state of all `Room`s.
/// ///
/// This will be mapped over in the client in order to store `Room`s in an async safe way. /// This will be mapped over in the client in order to store `Room`s in an async safe way.
async fn load_all_rooms(&self) -> Result<AllRooms>; async fn load_all_rooms(&self) -> Result<AllRooms>;
/// Save the current state of the `BaseClient` using the `StateStore::Store` type. /// Save the current state of the `BaseClient` using the `StateStore::Store` type.
async fn store_client_state(&self, _: ClientState) -> Result<()>; async fn store_client_state(&self, _: ClientState) -> Result<()>;
/// Save the state a single `Room`. /// Save the state a single `Room`.
async fn store_room_state(&self, _: RoomState<&Room>) -> Result<()>; async fn store_room_state(&self, _: RoomState<&Room>) -> Result<()>;
/// Remove state for a room.
///
/// This is used when a user leaves a room or rejects an invitation.
async fn delete_room_state(&self, _room: RoomState<&RoomId>) -> Result<()>;
} }
#[cfg(test)] #[cfg(test)]

View File

@ -0,0 +1,64 @@
{
"account_data": {
"events": []
},
"to_device": {
"events": []
},
"device_lists": {
"changed": [],
"left": []
},
"presence": {
"events": []
},
"rooms": {
"join": {},
"invite": {},
"leave": {
"!SVkFJHzfwvuaIEawgC:localhost": {
"timeline": {
"events": [
{
"content": {
"membership": "leave"
},
"origin_server_ts": 1589578095276,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"type": "m.room.member",
"unsigned": {
"replaces_state": "$blahblah",
"prev_content": {
"avatar_url": null,
"displayname": "me",
"membership": "invite"
},
"prev_sender": "@2example:localhost",
"age": 1757
},
"event_id": "$lQQ116Y-XqcjpSUGpuz36rNntUvOSpTjuaIvmtQ2AwA"
}
],
"prev_batch": "tokenTOKEN",
"limited": false
},
"state": {
"events": []
},
"account_data": {
"events": []
}
}
}
},
"groups": {
"join": {},
"invite": {},
"leave": {}
},
"device_one_time_keys_count": {
"signed_curve25519": 50
},
"next_batch": "s1380317562_757269739_1655566_503953763_334052043_1209862_55290918_65705002_101146"
}