diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index b3ec59ee..9f2b57c2 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -1244,12 +1244,80 @@ mod test { use matrix_sdk_base::JsonStore; use matrix_sdk_test::{EventBuilder, EventsFile}; - use mockito::{mock, Matcher}; + use tempfile::tempdir; + use std::convert::TryFrom; + use std::path::Path; use std::str::FromStr; use std::time::Duration; + #[tokio::test] + async fn test_join_leave_room() { + let homeserver = Url::from_str(&mockito::server_url()).unwrap(); + + let room_id = RoomId::try_from("!SVkFJHzfwvuaIEawgC:localhost").unwrap(); + + let session = Session { + access_token: "1234".to_owned(), + user_id: UserId::try_from("@example:localhost").unwrap(), + device_id: "DEVICEID".to_owned(), + }; + + let _m = mock( + "GET", + Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), + ) + .with_status(200) + .with_body_from_file("../test_data/sync.json") + .create(); + + let dir = tempdir().unwrap(); + let path: &Path = dir.path(); + let store = Box::new(JsonStore::open(path).unwrap()); + + let config = ClientConfig::default().state_store(store); + let client = + Client::new_with_config(homeserver.clone(), Some(session.clone()), config).unwrap(); + + let room = client.get_joined_room(&room_id).await; + assert!(room.is_none()); + + client.sync(SyncSettings::default()).await.unwrap(); + + let room = client.get_left_room(&room_id).await; + assert!(room.is_none()); + + let room = client.get_joined_room(&room_id).await; + assert!(room.is_some()); + + // test store reloads with correct room state from JsonStore + let store = Box::new(JsonStore::open(path).unwrap()); + let config = ClientConfig::default().state_store(store); + let joined_client = Client::new_with_config(homeserver, Some(session), config).unwrap(); + + // joined room reloaded from state store + joined_client.sync(SyncSettings::default()).await.unwrap(); + let room = joined_client.get_joined_room(&room_id).await; + assert!(room.is_some()); + + let _m = mock( + "GET", + Matcher::Regex(r"^/_matrix/client/r0/sync\?.*$".to_string()), + ) + .with_status(200) + .with_body_from_file("../test_data/leave_event_sync.json") + .create(); + + joined_client.sync(SyncSettings::default()).await.unwrap(); + + let room = joined_client.get_joined_room(&room_id).await; + assert!(room.is_none()); + + let room = joined_client.get_left_room(&room_id).await; + assert!(room.is_some()); + } + #[tokio::test] async fn account_data() { let homeserver = Url::from_str(&mockito::server_url()).unwrap(); @@ -1890,7 +1958,7 @@ mod test { .with_body_from_file("../test_data/login_response.json") .create(); - let dir = tempfile::tempdir().unwrap(); + let dir = tempdir().unwrap(); // a sync response to populate our JSON store let config = ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); diff --git a/matrix_sdk_base/src/client.rs b/matrix_sdk_base/src/client.rs index 45b42959..de42704e 100644 --- a/matrix_sdk_base/src/client.rs +++ b/matrix_sdk_base/src/client.rs @@ -310,15 +310,31 @@ impl BaseClient { Ok(()) } - pub(crate) async fn get_or_create_joined_room(&self, room_id: &RoomId) -> Arc> { + pub(crate) async fn get_or_create_joined_room( + &self, + room_id: &RoomId, + ) -> Result>> { // If this used to be an invited or left room remove them from our other // hashmaps. - self.invited_rooms.write().await.remove(room_id); - self.left_rooms.write().await.remove(room_id); + if self.invited_rooms.write().await.remove(room_id).is_some() { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .room_state_change(RoomState::Joined(room_id), RoomState::Invited(room_id)) + .await?; + } + } + + if self.left_rooms.write().await.remove(room_id).is_some() { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .room_state_change(RoomState::Invited(room_id), RoomState::Left(room_id)) + .await?; + } + } let mut rooms = self.joined_rooms.write().await; #[allow(clippy::or_fun_call)] - rooms + Ok(rooms .entry(room_id.clone()) .or_insert(Arc::new(RwLock::new(Room::new( room_id, @@ -330,7 +346,7 @@ impl BaseClient { .expect("Receiving events while not being logged in") .user_id, )))) - .clone() + .clone()) } /// Get a joined room with the given room id. @@ -349,14 +365,23 @@ impl BaseClient { self.joined_rooms.clone() } - pub(crate) async fn get_or_create_invited_room(&self, room_id: &RoomId) -> Arc> { + pub(crate) async fn get_or_create_invited_room( + &self, + room_id: &RoomId, + ) -> Result>> { // Remove the left rooms only here, since a join -> invite action per // spec can't happen. - self.left_rooms.write().await.remove(room_id); + if self.left_rooms.write().await.remove(room_id).is_some() { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .room_state_change(RoomState::Invited(room_id), RoomState::Left(room_id)) + .await?; + } + } let mut rooms = self.invited_rooms.write().await; #[allow(clippy::or_fun_call)] - rooms + Ok(rooms .entry(room_id.clone()) .or_insert(Arc::new(RwLock::new(Room::new( room_id, @@ -368,7 +393,7 @@ impl BaseClient { .expect("Receiving events while not being logged in") .user_id, )))) - .clone() + .clone()) } /// Get an invited room with the given room id. @@ -387,15 +412,31 @@ impl BaseClient { self.invited_rooms.clone() } - pub(crate) async fn get_or_create_left_room(&self, room_id: &RoomId) -> Arc> { + pub(crate) async fn get_or_create_left_room( + &self, + room_id: &RoomId, + ) -> Result>> { // If this used to be an invited or joined room remove them from our other // hashmaps. - self.invited_rooms.write().await.remove(room_id); - self.joined_rooms.write().await.remove(room_id); + if self.invited_rooms.write().await.remove(room_id).is_some() { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .room_state_change(RoomState::Left(room_id), RoomState::Invited(room_id)) + .await?; + } + } + + if self.joined_rooms.write().await.remove(room_id).is_some() { + if let Some(store) = self.state_store.read().await.as_ref() { + store + .room_state_change(RoomState::Left(room_id), RoomState::Joined(room_id)) + .await?; + } + } let mut rooms = self.left_rooms.write().await; #[allow(clippy::or_fun_call)] - rooms + Ok(rooms .entry(room_id.clone()) .or_insert(Arc::new(RwLock::new(Room::new( room_id, @@ -407,7 +448,7 @@ impl BaseClient { .expect("Receiving events while not being logged in") .user_id, )))) - .clone() + .clone()) } /// Get an left room with the given room id. @@ -471,7 +512,7 @@ impl BaseClient { &self, room_id: &RoomId, event: &mut EventJson, - ) -> (Option>, bool) { + ) -> Result<(Option>, bool)> { match event.deserialize() { #[allow(unused_mut)] Ok(mut e) => { @@ -492,7 +533,7 @@ impl BaseClient { } } - let room_lock = self.get_or_create_joined_room(&room_id).await; + let room_lock = self.get_or_create_joined_room(&room_id).await?; let mut room = room_lock.write().await; if let RoomEvent::RoomMember(event) = &e { @@ -505,12 +546,12 @@ impl BaseClient { self.invalidate_group_session(room_id).await; } - (decrypted_event, changed) + Ok((decrypted_event, changed)) } else { - (decrypted_event, room.receive_timeline_event(&e)) + Ok((decrypted_event, room.receive_timeline_event(&e))) } } - _ => (None, false), + _ => Ok((None, false)), } } @@ -524,8 +565,12 @@ impl BaseClient { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - pub async fn receive_joined_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool { - let room_lock = self.get_or_create_joined_room(room_id).await; + pub async fn receive_joined_state_event( + &self, + room_id: &RoomId, + event: &StateEvent, + ) -> Result { + let room_lock = self.get_or_create_joined_room(room_id).await?; let mut room = room_lock.write().await; if let StateEvent::RoomMember(e) = event { @@ -538,9 +583,9 @@ impl BaseClient { self.invalidate_group_session(room_id).await; } - changed + Ok(changed) } else { - room.receive_state_event(event) + Ok(room.receive_state_event(event)) } } @@ -558,10 +603,10 @@ impl BaseClient { &self, room_id: &RoomId, event: &AnyStrippedStateEvent, - ) -> bool { - let room_lock = self.get_or_create_invited_room(room_id).await; + ) -> Result { + let room_lock = self.get_or_create_invited_room(room_id).await?; let mut room = room_lock.write().await; - room.receive_stripped_state_event(event) + Ok(room.receive_stripped_state_event(event)) } /// Receive a timeline event for a room the user has left and update the client state. @@ -578,14 +623,14 @@ impl BaseClient { &self, room_id: &RoomId, event: &EventJson, - ) -> bool { + ) -> Result { match event.deserialize() { Ok(e) => { - let room_lock = self.get_or_create_left_room(room_id).await; + let room_lock = self.get_or_create_left_room(room_id).await?; let mut room = room_lock.write().await; - room.receive_timeline_event(&e) + Ok(room.receive_timeline_event(&e)) } - _ => false, + _ => Ok(false), } } @@ -599,10 +644,14 @@ impl BaseClient { /// * `room_id` - The unique id of the room the event belongs to. /// /// * `event` - The event that should be handled by the client. - pub async fn receive_left_state_event(&self, room_id: &RoomId, event: &StateEvent) -> bool { - let room_lock = self.get_or_create_left_room(room_id).await; + pub async fn receive_left_state_event( + &self, + room_id: &RoomId, + event: &StateEvent, + ) -> Result { + let room_lock = self.get_or_create_left_room(room_id).await?; let mut room = room_lock.write().await; - room.receive_state_event(event) + Ok(room.receive_state_event(event)) } /// Receive a presence event from a sync response and updates the client state. @@ -700,9 +749,6 @@ impl BaseClient { } } - // TODO do we want to move the rooms to the appropriate HashMaps when the corresponding - // event comes in e.g. move a joined room to a left room when leave event comes? - // when events change state, updated_* signals to StateStore to update database self.iter_joined_rooms(response).await?; self.iter_invited_rooms(&response).await?; @@ -730,7 +776,7 @@ impl BaseClient { let matrix_room = { for event in &joined_room.state.events { if let Ok(e) = event.deserialize() { - if self.receive_joined_state_event(&room_id, &e).await { + if self.receive_joined_state_event(&room_id, &e).await? { updated = true; } self.emit_state_event(&room_id, &e, RoomStateType::Joined) @@ -738,7 +784,7 @@ impl BaseClient { } } - self.get_or_create_joined_room(&room_id).await.clone() + self.get_or_create_joined_room(&room_id).await?.clone() }; #[cfg(feature = "encryption")] @@ -771,7 +817,7 @@ impl BaseClient { let decrypted_event = { let (decrypt_ev, timeline_update) = self .receive_joined_timeline_event(room_id, &mut event) - .await; + .await?; if timeline_update { updated = true; }; @@ -852,13 +898,13 @@ impl BaseClient { let matrix_room = { for event in &left_room.state.events { if let Ok(e) = event.deserialize() { - if self.receive_left_state_event(&room_id, &e).await { + if self.receive_left_state_event(&room_id, &e).await? { updated = true; } } } - self.get_or_create_left_room(&room_id).await.clone() + self.get_or_create_left_room(&room_id).await?.clone() }; for event in &mut left_room.state.events { @@ -869,7 +915,7 @@ impl BaseClient { } for event in &mut left_room.timeline.events { - if self.receive_left_timeline_event(room_id, &event).await { + if self.receive_left_timeline_event(room_id, &event).await? { updated = true; }; @@ -899,13 +945,13 @@ impl BaseClient { let matrix_room = { for event in &invited_room.invite_state.events { if let Ok(e) = event.deserialize() { - if self.receive_invite_state_event(&room_id, &e).await { + if self.receive_invite_state_event(&room_id, &e).await? { updated = true; } } } - self.get_or_create_invited_room(&room_id).await.clone() + self.get_or_create_invited_room(&room_id).await?.clone() }; for event in &invited_room.invite_state.events { diff --git a/matrix_sdk_base/src/error.rs b/matrix_sdk_base/src/error.rs index 1cc24bb3..c1134593 100644 --- a/matrix_sdk_base/src/error.rs +++ b/matrix_sdk_base/src/error.rs @@ -32,11 +32,16 @@ pub enum Error { #[error("the queried endpoint requires authentication but was called before logging in")] AuthenticationRequired, - /// An error de/serializing type for the `StateStore` + /// A generic error returned when the state store fails not due to + /// IO or (de)serialization. + #[error("state store: {0}")] + StateStore(String), + + /// An error when (de)serializing JSON. #[error(transparent)] SerdeJson(#[from] JsonError), - /// An error de/serializing type for the `StateStore` + /// An error representing IO errors. #[error(transparent)] IoError(#[from] IoError), diff --git a/matrix_sdk_base/src/state/json_store.rs b/matrix_sdk_base/src/state/json_store.rs index 8c74532b..2b707c7e 100644 --- a/matrix_sdk_base/src/state/json_store.rs +++ b/matrix_sdk_base/src/state/json_store.rs @@ -7,6 +7,7 @@ use std::sync::{ Arc, }; +use matrix_sdk_common::identifiers::RoomId; use matrix_sdk_common::locks::RwLock; use tokio::fs as async_fs; use tokio::io::AsyncWriteExt; @@ -166,6 +167,82 @@ impl StateStore for JsonStore { .await?; file.write_all(json.as_bytes()).await.map_err(Error::from) } + + async fn room_state_change( + &self, + current_room: RoomState<&RoomId>, + previous_room: RoomState<&RoomId>, + ) -> Result<()> { + let (room_id, room_state) = match &previous_room { + RoomState::Joined(id) => (id, "joined"), + RoomState::Invited(id) => (id, "invited"), + RoomState::Left(id) => (id, "left"), + }; + + if !self.user_path_set.load(Ordering::SeqCst) { + return Err(Error::StateStore("path for JsonStore not set".into())); + } + + let mut from = self.path.read().await.clone(); + from.push("rooms"); + from.push(&format!("{}/{}.json", room_state, room_id)); + + if !from.exists() { + return Err(Error::StateStore(format!( + "file {:?} not found for JsonStore", + from + ))); + } + + match current_room { + RoomState::Joined(room_id) => { + let mut to = self.path.read().await.clone(); + to.push("rooms"); + to.push("joined"); + if !to.exists() { + async_fs::create_dir_all(&to).await?; + } + + to.push(&format!("{}.json", room_id)); + if !to.exists() { + async_fs::File::create(&to).await?; + } + // now that we know the `to` file exists move `from` to new correct state folder + tokio::fs::rename(from, to).await?; + } + RoomState::Invited(room_id) => { + let mut to = self.path.read().await.clone(); + to.push("rooms"); + to.push("invited"); + if !to.exists() { + async_fs::create_dir_all(&to).await?; + } + + to.push(&format!("{}.json", room_id)); + if !to.exists() { + async_fs::File::create(&to).await?; + } + // now that we know the `to` file exists move `from` to new correct state folder + tokio::fs::rename(from, to).await?; + } + RoomState::Left(room_id) => { + let mut to = self.path.read().await.clone(); + to.push("rooms"); + to.push("left"); + if !to.exists() { + async_fs::create_dir_all(&to).await?; + } + + to.push(&format!("{}.json", room_id)); + if !to.exists() { + async_fs::File::create(&to).await?; + } + // now that we know the `to` file exists move `from` to new correct state folder + tokio::fs::rename(from, to).await?; + } + } + Ok(()) + } } #[cfg(test)] @@ -277,6 +354,54 @@ mod test { assert_eq!(invited.get(&id), Some(&Room::new(&id, &user))); } + #[tokio::test] + async fn test_store_load_join_leave_room_state() { + let dir = tempdir().unwrap(); + let path: &Path = dir.path(); + let store = JsonStore::open(path).unwrap(); + + let id = RoomId::try_from("!roomid:example.com").unwrap(); + let user = UserId::try_from("@example:example.com").unwrap(); + + let room = Room::new(&id, &user); + store + .store_room_state(RoomState::Joined(&room)) + .await + .unwrap(); + assert!(store + .room_state_change(RoomState::Left(&id), RoomState::Joined(&id)) + .await + .is_ok()); + let AllRooms { joined, left, .. } = store.load_all_rooms().await.unwrap(); + assert_eq!(left.get(&id), Some(&Room::new(&id, &user))); + assert!(joined.is_empty()); + } + + #[tokio::test] + async fn test_store_load_invite_join_room_state() { + let dir = tempdir().unwrap(); + let path: &Path = dir.path(); + let store = JsonStore::open(path).unwrap(); + + let id = RoomId::try_from("!roomid:example.com").unwrap(); + let user = UserId::try_from("@example:example.com").unwrap(); + + let room = Room::new(&id, &user); + store + .store_room_state(RoomState::Invited(&room)) + .await + .unwrap(); + assert!(store + .room_state_change(RoomState::Joined(&id), RoomState::Invited(&id)) + .await + .is_ok()); + let AllRooms { + invited, joined, .. + } = store.load_all_rooms().await.unwrap(); + assert_eq!(joined.get(&id), Some(&Room::new(&id, &user))); + assert!(invited.is_empty()); + } + #[tokio::test] async fn test_client_sync_store() { let dir = tempdir().unwrap(); diff --git a/matrix_sdk_base/src/state/mod.rs b/matrix_sdk_base/src/state/mod.rs index 6f28a969..13c4ae54 100644 --- a/matrix_sdk_base/src/state/mod.rs +++ b/matrix_sdk_base/src/state/mod.rs @@ -50,6 +50,10 @@ impl PartialEq for ClientState { } impl ClientState { + /// Create a JSON serialize-able `ClientState`. + /// + /// This enables non sensitive information to be saved by `JsonStore`. + #[allow(clippy::eval_order_dependence)] pub async fn from_base_client(client: &BaseClient) -> ClientState { let BaseClient { sync_token, @@ -94,6 +98,15 @@ pub trait StateStore: Send + Sync { async fn store_client_state(&self, _: ClientState) -> Result<()>; /// Save the state a single `Room`. async fn store_room_state(&self, _: RoomState<&Room>) -> Result<()>; + /// Signals to the `StateStore` a room has changed state. + /// + /// This enables implementing types to update the database when `RoomState` changes. + /// A `RoomState` change is when a user joins, is invited, or leaves a room. + async fn room_state_change( + &self, + _current: RoomState<&RoomId>, + _previous: RoomState<&RoomId>, + ) -> Result<()>; } #[cfg(test)] diff --git a/test_data/leave_event_sync.json b/test_data/leave_event_sync.json new file mode 100644 index 00000000..bd6c7d23 --- /dev/null +++ b/test_data/leave_event_sync.json @@ -0,0 +1,64 @@ +{ + "account_data": { + "events": [] + }, + "to_device": { + "events": [] + }, + "device_lists": { + "changed": [], + "left": [] + }, + "presence": { + "events": [] + }, + "rooms": { + "join": {}, + "invite": {}, + "leave": { + "!SVkFJHzfwvuaIEawgC:localhost": { + "timeline": { + "events": [ + { + "content": { + "membership": "leave" + }, + "origin_server_ts": 1589578095276, + "sender": "@example:localhost", + "state_key": "@example:localhost", + "type": "m.room.member", + "unsigned": { + "replaces_state": "$blahblah", + "prev_content": { + "avatar_url": null, + "displayname": "me", + "membership": "invite" + }, + "prev_sender": "@2example:localhost", + "age": 1757 + }, + "event_id": "$lQQ116Y-XqcjpSUGpuz36rNntUvOSpTjuaIvmtQ2AwA" + } + ], + "prev_batch": "tokenTOKEN", + "limited": false + }, + "state": { + "events": [] + }, + "account_data": { + "events": [] + } + } + } + }, + "groups": { + "join": {}, + "invite": {}, + "leave": {} + }, + "device_one_time_keys_count": { + "signed_curve25519": 50 + }, + "next_batch": "s1380317562_757269739_1655566_503953763_334052043_1209862_55290918_65705002_101146" +}