From 6dbe1956954ce611dd4ba2213a2df56ee0454910 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 21 Oct 2020 21:28:02 +0200 Subject: [PATCH 1/3] improvement: flush after every request that manipulates the db --- src/client_server/account.rs | 12 +++++-- src/client_server/alias.rs | 8 +++-- src/client_server/backup.rs | 46 ++++++++++++++++-------- src/client_server/capabilities.rs | 2 +- src/client_server/config.rs | 8 +++-- src/client_server/context.rs | 2 +- src/client_server/device.rs | 16 ++++++--- src/client_server/directory.rs | 2 ++ src/client_server/filter.rs | 4 +-- src/client_server/keys.rs | 20 +++++++---- src/client_server/media.rs | 6 ++-- src/client_server/membership.rs | 42 ++++++++++++++-------- src/client_server/message.rs | 4 ++- src/client_server/mod.rs | 2 +- src/client_server/presence.rs | 4 ++- src/client_server/profile.rs | 10 ++++-- src/client_server/push.rs | 21 ++++++++--- src/client_server/read_marker.rs | 5 ++- src/client_server/redact.rs | 2 ++ src/client_server/room.rs | 6 +++- src/client_server/search.rs | 2 +- src/client_server/session.rs | 14 +++++--- src/client_server/state.rs | 56 +++++++++++++++-------------- src/client_server/tag.rs | 10 ++++-- src/client_server/thirdparty.rs | 2 +- src/client_server/to_device.rs | 4 ++- src/client_server/unversioned.rs | 2 +- src/client_server/user_directory.rs | 2 +- src/client_server/voip.rs | 2 +- src/database.rs | 5 +++ 30 files changed, 216 insertions(+), 105 deletions(-) diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 09d9f18..74f862c 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -36,7 +36,7 @@ const GUEST_NAME_LENGTH: usize = 10; feature = "conduit_bin", get("/_matrix/client/r0/register/available", data = "") )] -pub fn get_register_available_route( +pub async fn get_register_available_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -466,6 +466,8 @@ pub async fn register_route( )?; } + db.flush().await?; + Ok(register::Response { access_token: Some(token), user_id, @@ -485,7 +487,7 @@ pub async fn register_route( feature = "conduit_bin", post("/_matrix/client/r0/account/password", data = "") )] -pub fn change_password_route( +pub async fn change_password_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -535,6 +537,8 @@ pub fn change_password_route( db.users.remove_device(&sender_user, &id)?; } + db.flush().await?; + Ok(change_password::Response.into()) } @@ -547,7 +551,7 @@ pub fn change_password_route( feature = "conduit_bin", get("/_matrix/client/r0/account/whoami", data = "") )] -pub fn whoami_route(body: Ruma) -> ConduitResult { +pub async fn whoami_route(body: Ruma) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); Ok(whoami::Response { user_id: sender_user.clone(), @@ -637,6 +641,8 @@ pub async fn deactivate_route( // Remove devices and mark account as deactivated db.users.deactivate_account(&sender_user)?; + db.flush().await?; + Ok(deactivate::Response { id_server_unbind_result: ThirdPartyIdRemovalStatus::NoSupport, } diff --git a/src/client_server/alias.rs b/src/client_server/alias.rs index c2c3eb9..094e70a 100644 --- a/src/client_server/alias.rs +++ b/src/client_server/alias.rs @@ -18,7 +18,7 @@ use rocket::{delete, get, put}; feature = "conduit_bin", put("/_matrix/client/r0/directory/room/<_>", data = "") )] -pub fn create_alias_route( +pub async fn create_alias_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -29,6 +29,8 @@ pub fn create_alias_route( db.rooms .set_alias(&body.room_alias, Some(&body.room_id), &db.globals)?; + db.flush().await?; + Ok(create_alias::Response::new().into()) } @@ -36,12 +38,14 @@ pub fn create_alias_route( feature = "conduit_bin", delete("/_matrix/client/r0/directory/room/<_>", data = "") )] -pub fn delete_alias_route( +pub async fn delete_alias_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { db.rooms.set_alias(&body.room_alias, None, &db.globals)?; + db.flush().await?; + Ok(delete_alias::Response::new().into()) } diff --git a/src/client_server/backup.rs b/src/client_server/backup.rs index 6e02198..c84af0a 100644 --- a/src/client_server/backup.rs +++ b/src/client_server/backup.rs @@ -17,7 +17,7 @@ use rocket::{delete, get, post, put}; feature = "conduit_bin", post("/_matrix/client/unstable/room_keys/version", data = "") )] -pub fn create_backup_route( +pub async fn create_backup_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -26,6 +26,8 @@ pub fn create_backup_route( .key_backups .create_backup(&sender_user, &body.algorithm, &db.globals)?; + db.flush().await?; + Ok(create_backup::Response { version }.into()) } @@ -33,7 +35,7 @@ pub fn create_backup_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/version/<_>", data = "") )] -pub fn update_backup_route( +pub async fn update_backup_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -41,6 +43,8 @@ pub fn update_backup_route( db.key_backups .update_backup(&sender_user, &body.version, &body.algorithm, &db.globals)?; + db.flush().await?; + Ok(update_backup::Response.into()) } @@ -48,7 +52,7 @@ pub fn update_backup_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/version", data = "") )] -pub fn get_latest_backup_route( +pub async fn get_latest_backup_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -75,7 +79,7 @@ pub fn get_latest_backup_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/version/<_>", data = "") )] -pub fn get_backup_route( +pub async fn get_backup_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -101,7 +105,7 @@ pub fn get_backup_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/version/<_>", data = "") )] -pub fn delete_backup_route( +pub async fn delete_backup_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -109,6 +113,8 @@ pub fn delete_backup_route( db.key_backups.delete_backup(&sender_user, &body.version)?; + db.flush().await?; + Ok(delete_backup::Response.into()) } @@ -117,7 +123,7 @@ pub fn delete_backup_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/keys", data = "") )] -pub fn add_backup_keys_route( +pub async fn add_backup_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -136,6 +142,8 @@ pub fn add_backup_keys_route( } } + db.flush().await?; + Ok(add_backup_keys::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -148,7 +156,7 @@ pub fn add_backup_keys_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/keys/<_>", data = "") )] -pub fn add_backup_key_sessions_route( +pub async fn add_backup_key_sessions_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -165,6 +173,8 @@ pub fn add_backup_key_sessions_route( )? } + db.flush().await?; + Ok(add_backup_key_sessions::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -177,7 +187,7 @@ pub fn add_backup_key_sessions_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/keys/<_>/<_>", data = "") )] -pub fn add_backup_key_session_route( +pub async fn add_backup_key_session_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -192,6 +202,8 @@ pub fn add_backup_key_session_route( &db.globals, )?; + db.flush().await?; + Ok(add_backup_key_session::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -203,7 +215,7 @@ pub fn add_backup_key_session_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/keys", data = "") )] -pub fn get_backup_keys_route( +pub async fn get_backup_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -218,7 +230,7 @@ pub fn get_backup_keys_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/keys/<_>", data = "") )] -pub fn get_backup_key_sessions_route( +pub async fn get_backup_key_sessions_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -235,7 +247,7 @@ pub fn get_backup_key_sessions_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/keys/<_>/<_>", data = "") )] -pub fn get_backup_key_session_route( +pub async fn get_backup_key_session_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -252,7 +264,7 @@ pub fn get_backup_key_session_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/keys", data = "") )] -pub fn delete_backup_keys_route( +pub async fn delete_backup_keys_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -261,6 +273,8 @@ pub fn delete_backup_keys_route( db.key_backups .delete_all_keys(&sender_user, &body.version)?; + db.flush().await?; + Ok(delete_backup_keys::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -272,7 +286,7 @@ pub fn delete_backup_keys_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/keys/<_>", data = "") )] -pub fn delete_backup_key_sessions_route( +pub async fn delete_backup_key_sessions_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -281,6 +295,8 @@ pub fn delete_backup_key_sessions_route( db.key_backups .delete_room_keys(&sender_user, &body.version, &body.room_id)?; + db.flush().await?; + Ok(delete_backup_key_sessions::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -292,7 +308,7 @@ pub fn delete_backup_key_sessions_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/keys/<_>/<_>", data = "") )] -pub fn delete_backup_key_session_route( +pub async fn delete_backup_key_session_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -301,6 +317,8 @@ pub fn delete_backup_key_session_route( db.key_backups .delete_room_key(&sender_user, &body.version, &body.room_id, &body.session_id)?; + db.flush().await?; + Ok(delete_backup_key_session::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, diff --git a/src/client_server/capabilities.rs b/src/client_server/capabilities.rs index ddf90f8..54c08ba 100644 --- a/src/client_server/capabilities.rs +++ b/src/client_server/capabilities.rs @@ -9,7 +9,7 @@ use rocket::get; /// /// Get information on this server's supported feature set and other relevent capabilities. #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/capabilities"))] -pub fn get_capabilities_route() -> ConduitResult { +pub async fn get_capabilities_route() -> ConduitResult { let mut available = BTreeMap::new(); available.insert( RoomVersionId::Version5, diff --git a/src/client_server/config.rs b/src/client_server/config.rs index adff05a..dd8de64 100644 --- a/src/client_server/config.rs +++ b/src/client_server/config.rs @@ -16,7 +16,7 @@ use rocket::{get, put}; feature = "conduit_bin", put("/_matrix/client/r0/user/<_>/account_data/<_>", data = "") )] -pub fn set_global_account_data_route( +pub async fn set_global_account_data_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -40,6 +40,8 @@ pub fn set_global_account_data_route( &db.globals, )?; + db.flush().await?; + Ok(set_global_account_data::Response.into()) } @@ -47,7 +49,7 @@ pub fn set_global_account_data_route( feature = "conduit_bin", get("/_matrix/client/r0/user/<_>/account_data/<_>", data = "") )] -pub fn get_global_account_data_route( +pub async fn get_global_account_data_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -58,5 +60,7 @@ pub fn get_global_account_data_route( .get::>(None, sender_user, body.event_type.clone().into())? .ok_or(Error::BadRequest(ErrorKind::NotFound, "Data not found."))?; + db.flush().await?; + Ok(get_global_account_data::Response { account_data: data }.into()) } diff --git a/src/client_server/context.rs b/src/client_server/context.rs index a1b848a..f2a8cd4 100644 --- a/src/client_server/context.rs +++ b/src/client_server/context.rs @@ -10,7 +10,7 @@ use rocket::get; feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/context/<_>", data = "") )] -pub fn get_context_route( +pub async fn get_context_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/device.rs b/src/client_server/device.rs index 233d233..86ac511 100644 --- a/src/client_server/device.rs +++ b/src/client_server/device.rs @@ -16,7 +16,7 @@ use rocket::{delete, get, post, put}; feature = "conduit_bin", get("/_matrix/client/r0/devices", data = "") )] -pub fn get_devices_route( +pub async fn get_devices_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -35,7 +35,7 @@ pub fn get_devices_route( feature = "conduit_bin", get("/_matrix/client/r0/devices/<_>", data = "") )] -pub fn get_device_route( +pub async fn get_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -53,7 +53,7 @@ pub fn get_device_route( feature = "conduit_bin", put("/_matrix/client/r0/devices/<_>", data = "") )] -pub fn update_device_route( +pub async fn update_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -69,6 +69,8 @@ pub fn update_device_route( db.users .update_device_metadata(&sender_user, &body.device_id, &device)?; + db.flush().await?; + Ok(update_device::Response.into()) } @@ -76,7 +78,7 @@ pub fn update_device_route( feature = "conduit_bin", delete("/_matrix/client/r0/devices/<_>", data = "") )] -pub fn delete_device_route( +pub async fn delete_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -115,6 +117,8 @@ pub fn delete_device_route( db.users.remove_device(&sender_user, &body.device_id)?; + db.flush().await?; + Ok(delete_device::Response.into()) } @@ -122,7 +126,7 @@ pub fn delete_device_route( feature = "conduit_bin", post("/_matrix/client/r0/delete_devices", data = "") )] -pub fn delete_devices_route( +pub async fn delete_devices_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -163,5 +167,7 @@ pub fn delete_devices_route( db.users.remove_device(&sender_user, &device_id)? } + db.flush().await?; + Ok(delete_devices::Response.into()) } diff --git a/src/client_server/directory.rs b/src/client_server/directory.rs index c82a15f..202417e 100644 --- a/src/client_server/directory.rs +++ b/src/client_server/directory.rs @@ -87,6 +87,8 @@ pub async fn set_room_visibility_route( room::Visibility::Private => db.rooms.set_public(&body.room_id, false)?, } + db.flush().await?; + Ok(set_room_visibility::Response.into()) } diff --git a/src/client_server/filter.rs b/src/client_server/filter.rs index 4b1c3a0..b6dc583 100644 --- a/src/client_server/filter.rs +++ b/src/client_server/filter.rs @@ -5,7 +5,7 @@ use ruma::api::client::r0::filter::{self, create_filter, get_filter}; use rocket::{get, post}; #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/user/<_>/filter/<_>"))] -pub fn get_filter_route() -> ConduitResult { +pub async fn get_filter_route() -> ConduitResult { // TODO Ok(get_filter::Response::new(filter::IncomingFilterDefinition { event_fields: None, @@ -18,7 +18,7 @@ pub fn get_filter_route() -> ConduitResult { } #[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/user/<_>/filter"))] -pub fn create_filter_route() -> ConduitResult { +pub async fn create_filter_route() -> ConduitResult { // TODO Ok(create_filter::Response::new(utils::random_string(10)).into()) } diff --git a/src/client_server/keys.rs b/src/client_server/keys.rs index 2af88cf..58c79da 100644 --- a/src/client_server/keys.rs +++ b/src/client_server/keys.rs @@ -22,7 +22,7 @@ use rocket::{get, post}; feature = "conduit_bin", post("/_matrix/client/r0/keys/upload", data = "") )] -pub fn upload_keys_route( +pub async fn upload_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -58,6 +58,8 @@ pub fn upload_keys_route( } } + db.flush().await?; + Ok(upload_keys::Response { one_time_key_counts: db.users.count_one_time_keys(sender_user, sender_device)?, } @@ -68,7 +70,7 @@ pub fn upload_keys_route( feature = "conduit_bin", post("/_matrix/client/r0/keys/query", data = "") )] -pub fn get_keys_route( +pub async fn get_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -148,7 +150,7 @@ pub fn get_keys_route( feature = "conduit_bin", post("/_matrix/client/r0/keys/claim", data = "") )] -pub fn claim_keys_route( +pub async fn claim_keys_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -168,6 +170,8 @@ pub fn claim_keys_route( one_time_keys.insert(user_id.clone(), container); } + db.flush().await?; + Ok(claim_keys::Response { failures: BTreeMap::new(), one_time_keys, @@ -179,7 +183,7 @@ pub fn claim_keys_route( feature = "conduit_bin", post("/_matrix/client/unstable/keys/device_signing/upload", data = "") )] -pub fn upload_signing_keys_route( +pub async fn upload_signing_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -227,6 +231,8 @@ pub fn upload_signing_keys_route( )?; } + db.flush().await?; + Ok(upload_signing_keys::Response.into()) } @@ -234,7 +240,7 @@ pub fn upload_signing_keys_route( feature = "conduit_bin", post("/_matrix/client/unstable/keys/signatures/upload", data = "") )] -pub fn upload_signatures_route( +pub async fn upload_signatures_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -285,6 +291,8 @@ pub fn upload_signatures_route( } } + db.flush().await?; + Ok(upload_signatures::Response.into()) } @@ -292,7 +300,7 @@ pub fn upload_signatures_route( feature = "conduit_bin", get("/_matrix/client/r0/keys/changes", data = "") )] -pub fn get_key_changes_route( +pub async fn get_key_changes_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/media.rs b/src/client_server/media.rs index 551546b..96874cc 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -14,7 +14,7 @@ use std::convert::TryInto; const MXC_LENGTH: usize = 32; #[cfg_attr(feature = "conduit_bin", get("/_matrix/media/r0/config"))] -pub fn get_media_config_route( +pub async fn get_media_config_route( db: State<'_, Database>, ) -> ConduitResult { Ok(get_media_config::Response { @@ -27,7 +27,7 @@ pub fn get_media_config_route( feature = "conduit_bin", post("/_matrix/media/r0/upload", data = "") )] -pub fn create_content_route( +pub async fn create_content_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -43,6 +43,8 @@ pub fn create_content_route( &body.file, )?; + db.flush().await?; + Ok(create_content::Response { content_uri: mxc }.into()) } diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 5d028d9..3380601 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -65,17 +65,19 @@ pub async fn join_room_by_id_or_alias_route( } }; + let join_room_response = join_room_by_id_helper( + &db, + body.sender_user.as_ref(), + &room_id, + &servers, + body.third_party_signed.as_ref(), + ) + .await?; + + db.flush().await?; + Ok(join_room_by_id_or_alias::Response { - room_id: join_room_by_id_helper( - &db, - body.sender_user.as_ref(), - &room_id, - &servers, - body.third_party_signed.as_ref(), - ) - .await? - .0 - .room_id, + room_id: join_room_response.0.room_id, } .into()) } @@ -124,6 +126,8 @@ pub async fn leave_room_route( &db.account_data, )?; + db.flush().await?; + Ok(leave_room::Response::new().into()) } @@ -160,6 +164,8 @@ pub async fn invite_user_route( &db.account_data, )?; + db.flush().await?; + Ok(invite_user::Response.into()) } else { Err(Error::BadRequest(ErrorKind::NotFound, "User not found.")) @@ -211,6 +217,8 @@ pub async fn kick_user_route( &db.account_data, )?; + db.flush().await?; + Ok(kick_user::Response::new().into()) } @@ -267,6 +275,8 @@ pub async fn ban_user_route( &db.account_data, )?; + db.flush().await?; + Ok(ban_user::Response::new().into()) } @@ -314,6 +324,8 @@ pub async fn unban_user_route( &db.account_data, )?; + db.flush().await?; + Ok(unban_user::Response::new().into()) } @@ -321,7 +333,7 @@ pub async fn unban_user_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/forget", data = "") )] -pub fn forget_room_route( +pub async fn forget_room_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -329,6 +341,8 @@ pub fn forget_room_route( db.rooms.forget(&body.room_id, &sender_user)?; + db.flush().await?; + Ok(forget_room::Response::new().into()) } @@ -336,7 +350,7 @@ pub fn forget_room_route( feature = "conduit_bin", get("/_matrix/client/r0/joined_rooms", data = "") )] -pub fn joined_rooms_route( +pub async fn joined_rooms_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -356,7 +370,7 @@ pub fn joined_rooms_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/members", data = "") )] -pub fn get_member_events_route( +pub async fn get_member_events_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -384,7 +398,7 @@ pub fn get_member_events_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/joined_members", data = "") )] -pub fn joined_members_route( +pub async fn joined_members_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 9b038bf..f9c8ba1 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -77,6 +77,8 @@ pub async fn send_message_event_route( event_id.as_bytes(), )?; + db.flush().await?; + Ok(send_message_event::Response::new(event_id).into()) } @@ -84,7 +86,7 @@ pub async fn send_message_event_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/messages", data = "") )] -pub fn get_message_events_route( +pub async fn get_message_events_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/mod.rs b/src/client_server/mod.rs index e5a36f3..672957b 100644 --- a/src/client_server/mod.rs +++ b/src/client_server/mod.rs @@ -75,6 +75,6 @@ const SESSION_ID_LENGTH: usize = 256; #[cfg(feature = "conduit_bin")] #[options("/<_..>")] -pub fn options_route() -> ConduitResult { +pub async fn options_route() -> ConduitResult { Ok(send_event_to_device::Response.into()) } diff --git a/src/client_server/presence.rs b/src/client_server/presence.rs index c529932..e597c69 100644 --- a/src/client_server/presence.rs +++ b/src/client_server/presence.rs @@ -10,7 +10,7 @@ use rocket::put; feature = "conduit_bin", put("/_matrix/client/r0/presence/<_>/status", data = "") )] -pub fn set_presence_route( +pub async fn set_presence_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -41,5 +41,7 @@ pub fn set_presence_route( )?; } + db.flush().await?; + Ok(set_presence::Response.into()) } diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index d6b9212..d754ace 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -90,6 +90,8 @@ pub async fn set_displayname_route( )?; } + db.flush().await?; + Ok(set_display_name::Response.into()) } @@ -97,7 +99,7 @@ pub async fn set_displayname_route( feature = "conduit_bin", get("/_matrix/client/r0/profile/<_>/displayname", data = "") )] -pub fn get_displayname_route( +pub async fn get_displayname_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -182,6 +184,8 @@ pub async fn set_avatar_url_route( )?; } + db.flush().await?; + Ok(set_avatar_url::Response.into()) } @@ -189,7 +193,7 @@ pub async fn set_avatar_url_route( feature = "conduit_bin", get("/_matrix/client/r0/profile/<_>/avatar_url", data = "") )] -pub fn get_avatar_url_route( +pub async fn get_avatar_url_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -203,7 +207,7 @@ pub fn get_avatar_url_route( feature = "conduit_bin", get("/_matrix/client/r0/profile/<_>", data = "") )] -pub fn get_profile_route( +pub async fn get_profile_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/push.rs b/src/client_server/push.rs index 568d30c..05ba8d0 100644 --- a/src/client_server/push.rs +++ b/src/client_server/push.rs @@ -16,7 +16,7 @@ use rocket::{get, post, put}; feature = "conduit_bin", get("/_matrix/client/r0/pushrules", data = "") )] -pub fn get_pushrules_all_route( +pub async fn get_pushrules_all_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -40,11 +40,15 @@ pub fn get_pushrules_all_route( "/_matrix/client/r0/pushrules/<_>/<_>/<_>", //data = "" ))] -pub fn set_pushrule_route(//db: State<'_, Database>, +pub async fn set_pushrule_route( + db: State<'_, Database>, //body: Ruma, ) -> ConduitResult { // TODO warn!("TODO: set_pushrule_route"); + + db.flush().await?; + Ok(set_pushrule::Response.into()) } @@ -52,14 +56,19 @@ pub fn set_pushrule_route(//db: State<'_, Database>, feature = "conduit_bin", put("/_matrix/client/r0/pushrules/<_>/<_>/<_>/enabled") )] -pub fn set_pushrule_enabled_route() -> ConduitResult { +pub async fn set_pushrule_enabled_route( + db: State<'_, Database>, +) -> ConduitResult { // TODO warn!("TODO: set_pushrule_enabled_route"); + + db.flush().await?; + Ok(set_pushrule_enabled::Response.into()) } #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/pushers"))] -pub fn get_pushers_route() -> ConduitResult { +pub async fn get_pushers_route() -> ConduitResult { Ok(get_pushers::Response { pushers: Vec::new(), } @@ -67,7 +76,9 @@ pub fn get_pushers_route() -> ConduitResult { } #[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/pushers/set"))] -pub fn set_pushers_route() -> ConduitResult { +pub async fn set_pushers_route(db: State<'_, Database>) -> ConduitResult { + db.flush().await?; + Ok(get_pushers::Response { pushers: Vec::new(), } diff --git a/src/client_server/read_marker.rs b/src/client_server/read_marker.rs index 77b4141..f3e7211 100644 --- a/src/client_server/read_marker.rs +++ b/src/client_server/read_marker.rs @@ -13,7 +13,7 @@ use std::{collections::BTreeMap, time::SystemTime}; feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/read_markers", data = "") )] -pub fn set_read_marker_route( +pub async fn set_read_marker_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -71,5 +71,8 @@ pub fn set_read_marker_route( &db.globals, )?; } + + db.flush().await?; + Ok(set_read_marker::Response.into()) } diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index b4fc4bb..486eb6c 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -36,5 +36,7 @@ pub async fn redact_event_route( &db.account_data, )?; + db.flush().await?; + Ok(redact_event::Response { event_id }.into()) } diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 92d8b8e..d1d051f 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -313,6 +313,8 @@ pub async fn create_room_route( db.rooms.set_public(&room_id, true)?; } + db.flush().await?; + Ok(create_room::Response::new(room_id).into()) } @@ -320,7 +322,7 @@ pub async fn create_room_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/event/<_>", data = "") )] -pub fn get_room_event_route( +pub async fn get_room_event_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -531,6 +533,8 @@ pub async fn upgrade_room_route( &db.account_data, )?; + db.flush().await?; + // Return the replacement room id Ok(upgrade_room::Response { replacement_room }.into()) } diff --git a/src/client_server/search.rs b/src/client_server/search.rs index 6e2b7ff..0950b25 100644 --- a/src/client_server/search.rs +++ b/src/client_server/search.rs @@ -11,7 +11,7 @@ use std::collections::BTreeMap; feature = "conduit_bin", post("/_matrix/client/r0/search", data = "") )] -pub fn search_events_route( +pub async fn search_events_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/session.rs b/src/client_server/session.rs index f10bf71..c8775ef 100644 --- a/src/client_server/session.rs +++ b/src/client_server/session.rs @@ -16,7 +16,7 @@ use rocket::{get, post}; /// Get the homeserver's supported login types. One of these should be used as the `type` field /// when logging in. #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/login"))] -pub fn get_login_types_route() -> ConduitResult { +pub async fn get_login_types_route() -> ConduitResult { Ok(get_login_types::Response::new(vec![get_login_types::LoginType::Password]).into()) } @@ -34,7 +34,7 @@ pub fn get_login_types_route() -> ConduitResult { feature = "conduit_bin", post("/_matrix/client/r0/login", data = "") )] -pub fn login_route( +pub async fn login_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -93,6 +93,8 @@ pub fn login_route( body.initial_device_display_name.clone(), )?; + db.flush().await?; + Ok(login::Response { user_id, access_token: token, @@ -113,7 +115,7 @@ pub fn login_route( feature = "conduit_bin", post("/_matrix/client/r0/logout", data = "") )] -pub fn logout_route( +pub async fn logout_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -122,6 +124,8 @@ pub fn logout_route( db.users.remove_device(&sender_user, sender_device)?; + db.flush().await?; + Ok(logout::Response::new().into()) } @@ -138,7 +142,7 @@ pub fn logout_route( feature = "conduit_bin", post("/_matrix/client/r0/logout/all", data = "") )] -pub fn logout_all_route( +pub async fn logout_all_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -150,5 +154,7 @@ pub fn logout_all_route( } } + db.flush().await?; + Ok(logout_all::Response::new().into()) } diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 90abac7..eae96b5 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -37,18 +37,19 @@ pub async fn send_state_event_for_key_route( ) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?; - Ok(send_state_event_for_key::Response::new( - send_state_event_for_key_helper( - &db, - sender_user, - &body.content, - content, - &body.room_id, - Some(body.state_key.to_owned()), - ) - .await?, + let event_id = send_state_event_for_key_helper( + &db, + sender_user, + &body.content, + content, + &body.room_id, + Some(body.state_key.to_owned()), ) - .into()) + .await?; + + db.flush().await?; + + Ok(send_state_event_for_key::Response { event_id }.into()) } #[cfg_attr( @@ -75,27 +76,28 @@ pub async fn send_state_event_for_empty_key_route( ) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?; - Ok(send_state_event_for_empty_key::Response::new( - send_state_event_for_key_helper( - &db, - sender_user - .as_ref() - .expect("no user for send state empty key rout"), - &body.content, - json, - &body.room_id, - Some("".into()), - ) - .await?, + let event_id = send_state_event_for_key_helper( + &db, + sender_user + .as_ref() + .expect("no user for send state empty key rout"), + &body.content, + json, + &body.room_id, + Some("".into()), ) - .into()) + .await?; + + db.flush().await?; + + Ok(send_state_event_for_empty_key::Response { event_id }.into()) } #[cfg_attr( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/state", data = "") )] -pub fn get_state_events_route( +pub async fn get_state_events_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -140,7 +142,7 @@ pub fn get_state_events_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/state/<_>/<_>", data = "") )] -pub fn get_state_events_for_key_route( +pub async fn get_state_events_for_key_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -189,7 +191,7 @@ pub fn get_state_events_for_key_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/state/<_>", data = "") )] -pub fn get_state_events_for_empty_key_route( +pub async fn get_state_events_for_empty_key_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { diff --git a/src/client_server/tag.rs b/src/client_server/tag.rs index c605313..7bbf9e8 100644 --- a/src/client_server/tag.rs +++ b/src/client_server/tag.rs @@ -13,7 +13,7 @@ use rocket::{delete, get, put}; feature = "conduit_bin", put("/_matrix/client/r0/user/<_>/rooms/<_>/tags/<_>", data = "") )] -pub fn update_tag_route( +pub async fn update_tag_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -40,6 +40,8 @@ pub fn update_tag_route( &db.globals, )?; + db.flush().await?; + Ok(create_tag::Response.into()) } @@ -47,7 +49,7 @@ pub fn update_tag_route( feature = "conduit_bin", delete("/_matrix/client/r0/user/<_>/rooms/<_>/tags/<_>", data = "") )] -pub fn delete_tag_route( +pub async fn delete_tag_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -71,6 +73,8 @@ pub fn delete_tag_route( &db.globals, )?; + db.flush().await?; + Ok(delete_tag::Response.into()) } @@ -78,7 +82,7 @@ pub fn delete_tag_route( feature = "conduit_bin", get("/_matrix/client/r0/user/<_>/rooms/<_>/tags", data = "") )] -pub fn get_tags_route( +pub async fn get_tags_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/thirdparty.rs b/src/client_server/thirdparty.rs index d9b540b..c775e9b 100644 --- a/src/client_server/thirdparty.rs +++ b/src/client_server/thirdparty.rs @@ -10,7 +10,7 @@ use std::collections::BTreeMap; feature = "conduit_bin", get("/_matrix/client/r0/thirdparty/protocols") )] -pub fn get_protocols_route() -> ConduitResult { +pub async fn get_protocols_route() -> ConduitResult { warn!("TODO: get_protocols_route"); Ok(get_protocols::Response { protocols: BTreeMap::new(), diff --git a/src/client_server/to_device.rs b/src/client_server/to_device.rs index 6719dae..8cc3e29 100644 --- a/src/client_server/to_device.rs +++ b/src/client_server/to_device.rs @@ -12,7 +12,7 @@ use rocket::put; feature = "conduit_bin", put("/_matrix/client/r0/sendToDevice/<_>/<_>", data = "") )] -pub fn send_event_to_device_route( +pub async fn send_event_to_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -66,5 +66,7 @@ pub fn send_event_to_device_route( db.transaction_ids .add_txnid(sender_user, sender_device, &body.txn_id, &[])?; + db.flush().await?; + Ok(send_event_to_device::Response.into()) } diff --git a/src/client_server/unversioned.rs b/src/client_server/unversioned.rs index ea7f633..e51ed56 100644 --- a/src/client_server/unversioned.rs +++ b/src/client_server/unversioned.rs @@ -15,7 +15,7 @@ use rocket::get; /// Note: Unstable features are used while developing new features. Clients should avoid using /// unstable features in their stable releases #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/versions"))] -pub fn get_supported_versions_route() -> ConduitResult { +pub async fn get_supported_versions_route() -> ConduitResult { let mut resp = get_supported_versions::Response::new(vec!["r0.5.0".to_owned(), "r0.6.0".to_owned()]); diff --git a/src/client_server/user_directory.rs b/src/client_server/user_directory.rs index dcf48fe..5829364 100644 --- a/src/client_server/user_directory.rs +++ b/src/client_server/user_directory.rs @@ -9,7 +9,7 @@ use rocket::post; feature = "conduit_bin", post("/_matrix/client/r0/user_directory/search", data = "") )] -pub fn search_users_route( +pub async fn search_users_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/voip.rs b/src/client_server/voip.rs index 33080ea..a8db62a 100644 --- a/src/client_server/voip.rs +++ b/src/client_server/voip.rs @@ -5,7 +5,7 @@ use ruma::api::client::{error::ErrorKind, r0::message::send_message_event}; use rocket::get; #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/voip/turnServer"))] -pub fn turn_server_route() -> ConduitResult { +pub async fn turn_server_route() -> ConduitResult { Err(Error::BadRequest( ErrorKind::NotFound, "There is no turn server yet.", diff --git a/src/database.rs b/src/database.rs index 4b2cba1..6bdc32a 100644 --- a/src/database.rs +++ b/src/database.rs @@ -227,4 +227,9 @@ impl Database { // Wait until one of them finds something futures.next().await; } + + pub async fn flush(&self) -> Result<()> { + self._db.flush_async().await?; + Ok(()) + } } From 6b3934e31da101873369984af0a4ca1ec6ac574d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 21 Oct 2020 21:43:59 +0200 Subject: [PATCH 2/3] feat: configurable cache capacity --- Cargo.toml | 2 +- src/database.rs | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9e23c36..8b29be8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec- # Used for long polling tokio = "0.2.22" # Used for storing data permanently -sled = "0.34.4" +sled = { version = "0.34.4", default-features = false } # Used for emitting log entries log = "0.4.11" # Used for rocket<->ruma conversions diff --git a/src/database.rs b/src/database.rs index 6bdc32a..883ef85 100644 --- a/src/database.rs +++ b/src/database.rs @@ -10,12 +10,11 @@ pub mod users; use crate::{Error, Result}; use directories::ProjectDirs; -use log::info; -use std::fs::remove_dir_all; - use futures::StreamExt; +use log::info; use rocket::{futures, Config}; use ruma::{DeviceId, UserId}; +use std::{convert::TryFrom, fs::remove_dir_all}; pub struct Database { pub globals: globals::Globals, @@ -66,7 +65,19 @@ impl Database { .to_owned()) })?; - let db = sled::open(&path)?; + let db = sled::Config::default() + .path(&path) + .cache_capacity( + u64::try_from( + config + .get_int("cache_capacity") + .unwrap_or(1024 * 1024 * 1024), + ) + .map_err(|_| Error::BadConfig("Cache capacity needs to be a u64."))?, + ) + .print_profile_on_drop(false) + .open()?; + info!("Opened sled database at {}", path); Ok(Self { From df82314440c711d90168584f57ce42ad1e122f65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Fri, 23 Oct 2020 14:35:41 +0200 Subject: [PATCH 3/3] improvement: welcome message --- src/client_server/account.rs | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 74f862c..fad59c3 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -14,8 +14,14 @@ use ruma::{ }, }, events::{ - room::canonical_alias, room::guest_access, room::history_visibility, room::join_rules, - room::member, room::name, room::topic, EventType, + room::canonical_alias, + room::guest_access, + room::history_visibility, + room::join_rules, + room::member, + room::name, + room::{message, topic}, + EventType, }, RoomAliasId, RoomId, RoomVersionId, UserId, }; @@ -464,6 +470,32 @@ pub async fn register_route( &db.sending, &db.account_data, )?; + + // Send welcome message + db.rooms.build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMessage, + content: serde_json::to_value(message::MessageEventContent::Text( + message::TextMessageEventContent { + body: "Thanks for trying out Conduit! This software is still in development, so expect many bugs and missing features. If you have federation enabled, you can join the Conduit chat room by typing `/join #conduit:matrix.org`. **Important: Please don't join any other Matrix rooms over federation without permission from the room's admins.** Some actions might trigger bugs in other server implementations, breaking the chat for everyone else.".to_owned(), + formatted: Some(message::FormattedBody { + format: message::MessageFormat::Html, + body: "Thanks for trying out Conduit! This software is still in development, so expect many bugs and missing features. If you have federation enabled, you can join the Conduit chat room by typing /join #conduit:matrix.org. Important: Please don't join any other Matrix rooms over federation without permission from the room's admins. Some actions might trigger bugs in other server implementations, breaking the chat for everyone else.".to_owned(), + }), + relates_to: None, + }, + )) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &room_id, + &db.globals, + &db.sending, + &db.account_data, + )?; } db.flush().await?;