From 6dbe1956954ce611dd4ba2213a2df56ee0454910 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Wed, 21 Oct 2020 21:28:02 +0200 Subject: [PATCH] improvement: flush after every request that manipulates the db --- src/client_server/account.rs | 12 +++++-- src/client_server/alias.rs | 8 +++-- src/client_server/backup.rs | 46 ++++++++++++++++-------- src/client_server/capabilities.rs | 2 +- src/client_server/config.rs | 8 +++-- src/client_server/context.rs | 2 +- src/client_server/device.rs | 16 ++++++--- src/client_server/directory.rs | 2 ++ src/client_server/filter.rs | 4 +-- src/client_server/keys.rs | 20 +++++++---- src/client_server/media.rs | 6 ++-- src/client_server/membership.rs | 42 ++++++++++++++-------- src/client_server/message.rs | 4 ++- src/client_server/mod.rs | 2 +- src/client_server/presence.rs | 4 ++- src/client_server/profile.rs | 10 ++++-- src/client_server/push.rs | 21 ++++++++--- src/client_server/read_marker.rs | 5 ++- src/client_server/redact.rs | 2 ++ src/client_server/room.rs | 6 +++- src/client_server/search.rs | 2 +- src/client_server/session.rs | 14 +++++--- src/client_server/state.rs | 56 +++++++++++++++-------------- src/client_server/tag.rs | 10 ++++-- src/client_server/thirdparty.rs | 2 +- src/client_server/to_device.rs | 4 ++- src/client_server/unversioned.rs | 2 +- src/client_server/user_directory.rs | 2 +- src/client_server/voip.rs | 2 +- src/database.rs | 5 +++ 30 files changed, 216 insertions(+), 105 deletions(-) diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 09d9f18..74f862c 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -36,7 +36,7 @@ const GUEST_NAME_LENGTH: usize = 10; feature = "conduit_bin", get("/_matrix/client/r0/register/available", data = "") )] -pub fn get_register_available_route( +pub async fn get_register_available_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -466,6 +466,8 @@ pub async fn register_route( )?; } + db.flush().await?; + Ok(register::Response { access_token: Some(token), user_id, @@ -485,7 +487,7 @@ pub async fn register_route( feature = "conduit_bin", post("/_matrix/client/r0/account/password", data = "") )] -pub fn change_password_route( +pub async fn change_password_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -535,6 +537,8 @@ pub fn change_password_route( db.users.remove_device(&sender_user, &id)?; } + db.flush().await?; + Ok(change_password::Response.into()) } @@ -547,7 +551,7 @@ pub fn change_password_route( feature = "conduit_bin", get("/_matrix/client/r0/account/whoami", data = "") )] -pub fn whoami_route(body: Ruma) -> ConduitResult { +pub async fn whoami_route(body: Ruma) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); Ok(whoami::Response { user_id: sender_user.clone(), @@ -637,6 +641,8 @@ pub async fn deactivate_route( // Remove devices and mark account as deactivated db.users.deactivate_account(&sender_user)?; + db.flush().await?; + Ok(deactivate::Response { id_server_unbind_result: ThirdPartyIdRemovalStatus::NoSupport, } diff --git a/src/client_server/alias.rs b/src/client_server/alias.rs index c2c3eb9..094e70a 100644 --- a/src/client_server/alias.rs +++ b/src/client_server/alias.rs @@ -18,7 +18,7 @@ use rocket::{delete, get, put}; feature = "conduit_bin", put("/_matrix/client/r0/directory/room/<_>", data = "") )] -pub fn create_alias_route( +pub async fn create_alias_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -29,6 +29,8 @@ pub fn create_alias_route( db.rooms .set_alias(&body.room_alias, Some(&body.room_id), &db.globals)?; + db.flush().await?; + Ok(create_alias::Response::new().into()) } @@ -36,12 +38,14 @@ pub fn create_alias_route( feature = "conduit_bin", delete("/_matrix/client/r0/directory/room/<_>", data = "") )] -pub fn delete_alias_route( +pub async fn delete_alias_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { db.rooms.set_alias(&body.room_alias, None, &db.globals)?; + db.flush().await?; + Ok(delete_alias::Response::new().into()) } diff --git a/src/client_server/backup.rs b/src/client_server/backup.rs index 6e02198..c84af0a 100644 --- a/src/client_server/backup.rs +++ b/src/client_server/backup.rs @@ -17,7 +17,7 @@ use rocket::{delete, get, post, put}; feature = "conduit_bin", post("/_matrix/client/unstable/room_keys/version", data = "") )] -pub fn create_backup_route( +pub async fn create_backup_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -26,6 +26,8 @@ pub fn create_backup_route( .key_backups .create_backup(&sender_user, &body.algorithm, &db.globals)?; + db.flush().await?; + Ok(create_backup::Response { version }.into()) } @@ -33,7 +35,7 @@ pub fn create_backup_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/version/<_>", data = "") )] -pub fn update_backup_route( +pub async fn update_backup_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -41,6 +43,8 @@ pub fn update_backup_route( db.key_backups .update_backup(&sender_user, &body.version, &body.algorithm, &db.globals)?; + db.flush().await?; + Ok(update_backup::Response.into()) } @@ -48,7 +52,7 @@ pub fn update_backup_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/version", data = "") )] -pub fn get_latest_backup_route( +pub async fn get_latest_backup_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -75,7 +79,7 @@ pub fn get_latest_backup_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/version/<_>", data = "") )] -pub fn get_backup_route( +pub async fn get_backup_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -101,7 +105,7 @@ pub fn get_backup_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/version/<_>", data = "") )] -pub fn delete_backup_route( +pub async fn delete_backup_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -109,6 +113,8 @@ pub fn delete_backup_route( db.key_backups.delete_backup(&sender_user, &body.version)?; + db.flush().await?; + Ok(delete_backup::Response.into()) } @@ -117,7 +123,7 @@ pub fn delete_backup_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/keys", data = "") )] -pub fn add_backup_keys_route( +pub async fn add_backup_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -136,6 +142,8 @@ pub fn add_backup_keys_route( } } + db.flush().await?; + Ok(add_backup_keys::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -148,7 +156,7 @@ pub fn add_backup_keys_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/keys/<_>", data = "") )] -pub fn add_backup_key_sessions_route( +pub async fn add_backup_key_sessions_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -165,6 +173,8 @@ pub fn add_backup_key_sessions_route( )? } + db.flush().await?; + Ok(add_backup_key_sessions::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -177,7 +187,7 @@ pub fn add_backup_key_sessions_route( feature = "conduit_bin", put("/_matrix/client/unstable/room_keys/keys/<_>/<_>", data = "") )] -pub fn add_backup_key_session_route( +pub async fn add_backup_key_session_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -192,6 +202,8 @@ pub fn add_backup_key_session_route( &db.globals, )?; + db.flush().await?; + Ok(add_backup_key_session::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -203,7 +215,7 @@ pub fn add_backup_key_session_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/keys", data = "") )] -pub fn get_backup_keys_route( +pub async fn get_backup_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -218,7 +230,7 @@ pub fn get_backup_keys_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/keys/<_>", data = "") )] -pub fn get_backup_key_sessions_route( +pub async fn get_backup_key_sessions_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -235,7 +247,7 @@ pub fn get_backup_key_sessions_route( feature = "conduit_bin", get("/_matrix/client/unstable/room_keys/keys/<_>/<_>", data = "") )] -pub fn get_backup_key_session_route( +pub async fn get_backup_key_session_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -252,7 +264,7 @@ pub fn get_backup_key_session_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/keys", data = "") )] -pub fn delete_backup_keys_route( +pub async fn delete_backup_keys_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -261,6 +273,8 @@ pub fn delete_backup_keys_route( db.key_backups .delete_all_keys(&sender_user, &body.version)?; + db.flush().await?; + Ok(delete_backup_keys::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -272,7 +286,7 @@ pub fn delete_backup_keys_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/keys/<_>", data = "") )] -pub fn delete_backup_key_sessions_route( +pub async fn delete_backup_key_sessions_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -281,6 +295,8 @@ pub fn delete_backup_key_sessions_route( db.key_backups .delete_room_keys(&sender_user, &body.version, &body.room_id)?; + db.flush().await?; + Ok(delete_backup_key_sessions::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, @@ -292,7 +308,7 @@ pub fn delete_backup_key_sessions_route( feature = "conduit_bin", delete("/_matrix/client/unstable/room_keys/keys/<_>/<_>", data = "") )] -pub fn delete_backup_key_session_route( +pub async fn delete_backup_key_session_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -301,6 +317,8 @@ pub fn delete_backup_key_session_route( db.key_backups .delete_room_key(&sender_user, &body.version, &body.room_id, &body.session_id)?; + db.flush().await?; + Ok(delete_backup_key_session::Response { count: (db.key_backups.count_keys(sender_user, &body.version)? as u32).into(), etag: db.key_backups.get_etag(sender_user, &body.version)?, diff --git a/src/client_server/capabilities.rs b/src/client_server/capabilities.rs index ddf90f8..54c08ba 100644 --- a/src/client_server/capabilities.rs +++ b/src/client_server/capabilities.rs @@ -9,7 +9,7 @@ use rocket::get; /// /// Get information on this server's supported feature set and other relevent capabilities. #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/capabilities"))] -pub fn get_capabilities_route() -> ConduitResult { +pub async fn get_capabilities_route() -> ConduitResult { let mut available = BTreeMap::new(); available.insert( RoomVersionId::Version5, diff --git a/src/client_server/config.rs b/src/client_server/config.rs index adff05a..dd8de64 100644 --- a/src/client_server/config.rs +++ b/src/client_server/config.rs @@ -16,7 +16,7 @@ use rocket::{get, put}; feature = "conduit_bin", put("/_matrix/client/r0/user/<_>/account_data/<_>", data = "") )] -pub fn set_global_account_data_route( +pub async fn set_global_account_data_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -40,6 +40,8 @@ pub fn set_global_account_data_route( &db.globals, )?; + db.flush().await?; + Ok(set_global_account_data::Response.into()) } @@ -47,7 +49,7 @@ pub fn set_global_account_data_route( feature = "conduit_bin", get("/_matrix/client/r0/user/<_>/account_data/<_>", data = "") )] -pub fn get_global_account_data_route( +pub async fn get_global_account_data_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -58,5 +60,7 @@ pub fn get_global_account_data_route( .get::>(None, sender_user, body.event_type.clone().into())? .ok_or(Error::BadRequest(ErrorKind::NotFound, "Data not found."))?; + db.flush().await?; + Ok(get_global_account_data::Response { account_data: data }.into()) } diff --git a/src/client_server/context.rs b/src/client_server/context.rs index a1b848a..f2a8cd4 100644 --- a/src/client_server/context.rs +++ b/src/client_server/context.rs @@ -10,7 +10,7 @@ use rocket::get; feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/context/<_>", data = "") )] -pub fn get_context_route( +pub async fn get_context_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/device.rs b/src/client_server/device.rs index 233d233..86ac511 100644 --- a/src/client_server/device.rs +++ b/src/client_server/device.rs @@ -16,7 +16,7 @@ use rocket::{delete, get, post, put}; feature = "conduit_bin", get("/_matrix/client/r0/devices", data = "") )] -pub fn get_devices_route( +pub async fn get_devices_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -35,7 +35,7 @@ pub fn get_devices_route( feature = "conduit_bin", get("/_matrix/client/r0/devices/<_>", data = "") )] -pub fn get_device_route( +pub async fn get_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -53,7 +53,7 @@ pub fn get_device_route( feature = "conduit_bin", put("/_matrix/client/r0/devices/<_>", data = "") )] -pub fn update_device_route( +pub async fn update_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -69,6 +69,8 @@ pub fn update_device_route( db.users .update_device_metadata(&sender_user, &body.device_id, &device)?; + db.flush().await?; + Ok(update_device::Response.into()) } @@ -76,7 +78,7 @@ pub fn update_device_route( feature = "conduit_bin", delete("/_matrix/client/r0/devices/<_>", data = "") )] -pub fn delete_device_route( +pub async fn delete_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -115,6 +117,8 @@ pub fn delete_device_route( db.users.remove_device(&sender_user, &body.device_id)?; + db.flush().await?; + Ok(delete_device::Response.into()) } @@ -122,7 +126,7 @@ pub fn delete_device_route( feature = "conduit_bin", post("/_matrix/client/r0/delete_devices", data = "") )] -pub fn delete_devices_route( +pub async fn delete_devices_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -163,5 +167,7 @@ pub fn delete_devices_route( db.users.remove_device(&sender_user, &device_id)? } + db.flush().await?; + Ok(delete_devices::Response.into()) } diff --git a/src/client_server/directory.rs b/src/client_server/directory.rs index c82a15f..202417e 100644 --- a/src/client_server/directory.rs +++ b/src/client_server/directory.rs @@ -87,6 +87,8 @@ pub async fn set_room_visibility_route( room::Visibility::Private => db.rooms.set_public(&body.room_id, false)?, } + db.flush().await?; + Ok(set_room_visibility::Response.into()) } diff --git a/src/client_server/filter.rs b/src/client_server/filter.rs index 4b1c3a0..b6dc583 100644 --- a/src/client_server/filter.rs +++ b/src/client_server/filter.rs @@ -5,7 +5,7 @@ use ruma::api::client::r0::filter::{self, create_filter, get_filter}; use rocket::{get, post}; #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/user/<_>/filter/<_>"))] -pub fn get_filter_route() -> ConduitResult { +pub async fn get_filter_route() -> ConduitResult { // TODO Ok(get_filter::Response::new(filter::IncomingFilterDefinition { event_fields: None, @@ -18,7 +18,7 @@ pub fn get_filter_route() -> ConduitResult { } #[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/user/<_>/filter"))] -pub fn create_filter_route() -> ConduitResult { +pub async fn create_filter_route() -> ConduitResult { // TODO Ok(create_filter::Response::new(utils::random_string(10)).into()) } diff --git a/src/client_server/keys.rs b/src/client_server/keys.rs index 2af88cf..58c79da 100644 --- a/src/client_server/keys.rs +++ b/src/client_server/keys.rs @@ -22,7 +22,7 @@ use rocket::{get, post}; feature = "conduit_bin", post("/_matrix/client/r0/keys/upload", data = "") )] -pub fn upload_keys_route( +pub async fn upload_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -58,6 +58,8 @@ pub fn upload_keys_route( } } + db.flush().await?; + Ok(upload_keys::Response { one_time_key_counts: db.users.count_one_time_keys(sender_user, sender_device)?, } @@ -68,7 +70,7 @@ pub fn upload_keys_route( feature = "conduit_bin", post("/_matrix/client/r0/keys/query", data = "") )] -pub fn get_keys_route( +pub async fn get_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -148,7 +150,7 @@ pub fn get_keys_route( feature = "conduit_bin", post("/_matrix/client/r0/keys/claim", data = "") )] -pub fn claim_keys_route( +pub async fn claim_keys_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -168,6 +170,8 @@ pub fn claim_keys_route( one_time_keys.insert(user_id.clone(), container); } + db.flush().await?; + Ok(claim_keys::Response { failures: BTreeMap::new(), one_time_keys, @@ -179,7 +183,7 @@ pub fn claim_keys_route( feature = "conduit_bin", post("/_matrix/client/unstable/keys/device_signing/upload", data = "") )] -pub fn upload_signing_keys_route( +pub async fn upload_signing_keys_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -227,6 +231,8 @@ pub fn upload_signing_keys_route( )?; } + db.flush().await?; + Ok(upload_signing_keys::Response.into()) } @@ -234,7 +240,7 @@ pub fn upload_signing_keys_route( feature = "conduit_bin", post("/_matrix/client/unstable/keys/signatures/upload", data = "") )] -pub fn upload_signatures_route( +pub async fn upload_signatures_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -285,6 +291,8 @@ pub fn upload_signatures_route( } } + db.flush().await?; + Ok(upload_signatures::Response.into()) } @@ -292,7 +300,7 @@ pub fn upload_signatures_route( feature = "conduit_bin", get("/_matrix/client/r0/keys/changes", data = "") )] -pub fn get_key_changes_route( +pub async fn get_key_changes_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/media.rs b/src/client_server/media.rs index 551546b..96874cc 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -14,7 +14,7 @@ use std::convert::TryInto; const MXC_LENGTH: usize = 32; #[cfg_attr(feature = "conduit_bin", get("/_matrix/media/r0/config"))] -pub fn get_media_config_route( +pub async fn get_media_config_route( db: State<'_, Database>, ) -> ConduitResult { Ok(get_media_config::Response { @@ -27,7 +27,7 @@ pub fn get_media_config_route( feature = "conduit_bin", post("/_matrix/media/r0/upload", data = "") )] -pub fn create_content_route( +pub async fn create_content_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -43,6 +43,8 @@ pub fn create_content_route( &body.file, )?; + db.flush().await?; + Ok(create_content::Response { content_uri: mxc }.into()) } diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 5d028d9..3380601 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -65,17 +65,19 @@ pub async fn join_room_by_id_or_alias_route( } }; + let join_room_response = join_room_by_id_helper( + &db, + body.sender_user.as_ref(), + &room_id, + &servers, + body.third_party_signed.as_ref(), + ) + .await?; + + db.flush().await?; + Ok(join_room_by_id_or_alias::Response { - room_id: join_room_by_id_helper( - &db, - body.sender_user.as_ref(), - &room_id, - &servers, - body.third_party_signed.as_ref(), - ) - .await? - .0 - .room_id, + room_id: join_room_response.0.room_id, } .into()) } @@ -124,6 +126,8 @@ pub async fn leave_room_route( &db.account_data, )?; + db.flush().await?; + Ok(leave_room::Response::new().into()) } @@ -160,6 +164,8 @@ pub async fn invite_user_route( &db.account_data, )?; + db.flush().await?; + Ok(invite_user::Response.into()) } else { Err(Error::BadRequest(ErrorKind::NotFound, "User not found.")) @@ -211,6 +217,8 @@ pub async fn kick_user_route( &db.account_data, )?; + db.flush().await?; + Ok(kick_user::Response::new().into()) } @@ -267,6 +275,8 @@ pub async fn ban_user_route( &db.account_data, )?; + db.flush().await?; + Ok(ban_user::Response::new().into()) } @@ -314,6 +324,8 @@ pub async fn unban_user_route( &db.account_data, )?; + db.flush().await?; + Ok(unban_user::Response::new().into()) } @@ -321,7 +333,7 @@ pub async fn unban_user_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/forget", data = "") )] -pub fn forget_room_route( +pub async fn forget_room_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -329,6 +341,8 @@ pub fn forget_room_route( db.rooms.forget(&body.room_id, &sender_user)?; + db.flush().await?; + Ok(forget_room::Response::new().into()) } @@ -336,7 +350,7 @@ pub fn forget_room_route( feature = "conduit_bin", get("/_matrix/client/r0/joined_rooms", data = "") )] -pub fn joined_rooms_route( +pub async fn joined_rooms_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -356,7 +370,7 @@ pub fn joined_rooms_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/members", data = "") )] -pub fn get_member_events_route( +pub async fn get_member_events_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -384,7 +398,7 @@ pub fn get_member_events_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/joined_members", data = "") )] -pub fn joined_members_route( +pub async fn joined_members_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 9b038bf..f9c8ba1 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -77,6 +77,8 @@ pub async fn send_message_event_route( event_id.as_bytes(), )?; + db.flush().await?; + Ok(send_message_event::Response::new(event_id).into()) } @@ -84,7 +86,7 @@ pub async fn send_message_event_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/messages", data = "") )] -pub fn get_message_events_route( +pub async fn get_message_events_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/mod.rs b/src/client_server/mod.rs index e5a36f3..672957b 100644 --- a/src/client_server/mod.rs +++ b/src/client_server/mod.rs @@ -75,6 +75,6 @@ const SESSION_ID_LENGTH: usize = 256; #[cfg(feature = "conduit_bin")] #[options("/<_..>")] -pub fn options_route() -> ConduitResult { +pub async fn options_route() -> ConduitResult { Ok(send_event_to_device::Response.into()) } diff --git a/src/client_server/presence.rs b/src/client_server/presence.rs index c529932..e597c69 100644 --- a/src/client_server/presence.rs +++ b/src/client_server/presence.rs @@ -10,7 +10,7 @@ use rocket::put; feature = "conduit_bin", put("/_matrix/client/r0/presence/<_>/status", data = "") )] -pub fn set_presence_route( +pub async fn set_presence_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -41,5 +41,7 @@ pub fn set_presence_route( )?; } + db.flush().await?; + Ok(set_presence::Response.into()) } diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index d6b9212..d754ace 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -90,6 +90,8 @@ pub async fn set_displayname_route( )?; } + db.flush().await?; + Ok(set_display_name::Response.into()) } @@ -97,7 +99,7 @@ pub async fn set_displayname_route( feature = "conduit_bin", get("/_matrix/client/r0/profile/<_>/displayname", data = "") )] -pub fn get_displayname_route( +pub async fn get_displayname_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -182,6 +184,8 @@ pub async fn set_avatar_url_route( )?; } + db.flush().await?; + Ok(set_avatar_url::Response.into()) } @@ -189,7 +193,7 @@ pub async fn set_avatar_url_route( feature = "conduit_bin", get("/_matrix/client/r0/profile/<_>/avatar_url", data = "") )] -pub fn get_avatar_url_route( +pub async fn get_avatar_url_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -203,7 +207,7 @@ pub fn get_avatar_url_route( feature = "conduit_bin", get("/_matrix/client/r0/profile/<_>", data = "") )] -pub fn get_profile_route( +pub async fn get_profile_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/push.rs b/src/client_server/push.rs index 568d30c..05ba8d0 100644 --- a/src/client_server/push.rs +++ b/src/client_server/push.rs @@ -16,7 +16,7 @@ use rocket::{get, post, put}; feature = "conduit_bin", get("/_matrix/client/r0/pushrules", data = "") )] -pub fn get_pushrules_all_route( +pub async fn get_pushrules_all_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -40,11 +40,15 @@ pub fn get_pushrules_all_route( "/_matrix/client/r0/pushrules/<_>/<_>/<_>", //data = "" ))] -pub fn set_pushrule_route(//db: State<'_, Database>, +pub async fn set_pushrule_route( + db: State<'_, Database>, //body: Ruma, ) -> ConduitResult { // TODO warn!("TODO: set_pushrule_route"); + + db.flush().await?; + Ok(set_pushrule::Response.into()) } @@ -52,14 +56,19 @@ pub fn set_pushrule_route(//db: State<'_, Database>, feature = "conduit_bin", put("/_matrix/client/r0/pushrules/<_>/<_>/<_>/enabled") )] -pub fn set_pushrule_enabled_route() -> ConduitResult { +pub async fn set_pushrule_enabled_route( + db: State<'_, Database>, +) -> ConduitResult { // TODO warn!("TODO: set_pushrule_enabled_route"); + + db.flush().await?; + Ok(set_pushrule_enabled::Response.into()) } #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/pushers"))] -pub fn get_pushers_route() -> ConduitResult { +pub async fn get_pushers_route() -> ConduitResult { Ok(get_pushers::Response { pushers: Vec::new(), } @@ -67,7 +76,9 @@ pub fn get_pushers_route() -> ConduitResult { } #[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/pushers/set"))] -pub fn set_pushers_route() -> ConduitResult { +pub async fn set_pushers_route(db: State<'_, Database>) -> ConduitResult { + db.flush().await?; + Ok(get_pushers::Response { pushers: Vec::new(), } diff --git a/src/client_server/read_marker.rs b/src/client_server/read_marker.rs index 77b4141..f3e7211 100644 --- a/src/client_server/read_marker.rs +++ b/src/client_server/read_marker.rs @@ -13,7 +13,7 @@ use std::{collections::BTreeMap, time::SystemTime}; feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/read_markers", data = "") )] -pub fn set_read_marker_route( +pub async fn set_read_marker_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -71,5 +71,8 @@ pub fn set_read_marker_route( &db.globals, )?; } + + db.flush().await?; + Ok(set_read_marker::Response.into()) } diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index b4fc4bb..486eb6c 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -36,5 +36,7 @@ pub async fn redact_event_route( &db.account_data, )?; + db.flush().await?; + Ok(redact_event::Response { event_id }.into()) } diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 92d8b8e..d1d051f 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -313,6 +313,8 @@ pub async fn create_room_route( db.rooms.set_public(&room_id, true)?; } + db.flush().await?; + Ok(create_room::Response::new(room_id).into()) } @@ -320,7 +322,7 @@ pub async fn create_room_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/event/<_>", data = "") )] -pub fn get_room_event_route( +pub async fn get_room_event_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -531,6 +533,8 @@ pub async fn upgrade_room_route( &db.account_data, )?; + db.flush().await?; + // Return the replacement room id Ok(upgrade_room::Response { replacement_room }.into()) } diff --git a/src/client_server/search.rs b/src/client_server/search.rs index 6e2b7ff..0950b25 100644 --- a/src/client_server/search.rs +++ b/src/client_server/search.rs @@ -11,7 +11,7 @@ use std::collections::BTreeMap; feature = "conduit_bin", post("/_matrix/client/r0/search", data = "") )] -pub fn search_events_route( +pub async fn search_events_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/session.rs b/src/client_server/session.rs index f10bf71..c8775ef 100644 --- a/src/client_server/session.rs +++ b/src/client_server/session.rs @@ -16,7 +16,7 @@ use rocket::{get, post}; /// Get the homeserver's supported login types. One of these should be used as the `type` field /// when logging in. #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/login"))] -pub fn get_login_types_route() -> ConduitResult { +pub async fn get_login_types_route() -> ConduitResult { Ok(get_login_types::Response::new(vec![get_login_types::LoginType::Password]).into()) } @@ -34,7 +34,7 @@ pub fn get_login_types_route() -> ConduitResult { feature = "conduit_bin", post("/_matrix/client/r0/login", data = "") )] -pub fn login_route( +pub async fn login_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -93,6 +93,8 @@ pub fn login_route( body.initial_device_display_name.clone(), )?; + db.flush().await?; + Ok(login::Response { user_id, access_token: token, @@ -113,7 +115,7 @@ pub fn login_route( feature = "conduit_bin", post("/_matrix/client/r0/logout", data = "") )] -pub fn logout_route( +pub async fn logout_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -122,6 +124,8 @@ pub fn logout_route( db.users.remove_device(&sender_user, sender_device)?; + db.flush().await?; + Ok(logout::Response::new().into()) } @@ -138,7 +142,7 @@ pub fn logout_route( feature = "conduit_bin", post("/_matrix/client/r0/logout/all", data = "") )] -pub fn logout_all_route( +pub async fn logout_all_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -150,5 +154,7 @@ pub fn logout_all_route( } } + db.flush().await?; + Ok(logout_all::Response::new().into()) } diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 90abac7..eae96b5 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -37,18 +37,19 @@ pub async fn send_state_event_for_key_route( ) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?; - Ok(send_state_event_for_key::Response::new( - send_state_event_for_key_helper( - &db, - sender_user, - &body.content, - content, - &body.room_id, - Some(body.state_key.to_owned()), - ) - .await?, + let event_id = send_state_event_for_key_helper( + &db, + sender_user, + &body.content, + content, + &body.room_id, + Some(body.state_key.to_owned()), ) - .into()) + .await?; + + db.flush().await?; + + Ok(send_state_event_for_key::Response { event_id }.into()) } #[cfg_attr( @@ -75,27 +76,28 @@ pub async fn send_state_event_for_empty_key_route( ) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))?; - Ok(send_state_event_for_empty_key::Response::new( - send_state_event_for_key_helper( - &db, - sender_user - .as_ref() - .expect("no user for send state empty key rout"), - &body.content, - json, - &body.room_id, - Some("".into()), - ) - .await?, + let event_id = send_state_event_for_key_helper( + &db, + sender_user + .as_ref() + .expect("no user for send state empty key rout"), + &body.content, + json, + &body.room_id, + Some("".into()), ) - .into()) + .await?; + + db.flush().await?; + + Ok(send_state_event_for_empty_key::Response { event_id }.into()) } #[cfg_attr( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/state", data = "") )] -pub fn get_state_events_route( +pub async fn get_state_events_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -140,7 +142,7 @@ pub fn get_state_events_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/state/<_>/<_>", data = "") )] -pub fn get_state_events_for_key_route( +pub async fn get_state_events_for_key_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { @@ -189,7 +191,7 @@ pub fn get_state_events_for_key_route( feature = "conduit_bin", get("/_matrix/client/r0/rooms/<_>/state/<_>", data = "") )] -pub fn get_state_events_for_empty_key_route( +pub async fn get_state_events_for_empty_key_route( db: State<'_, Database>, body: Ruma, ) -> ConduitResult { diff --git a/src/client_server/tag.rs b/src/client_server/tag.rs index c605313..7bbf9e8 100644 --- a/src/client_server/tag.rs +++ b/src/client_server/tag.rs @@ -13,7 +13,7 @@ use rocket::{delete, get, put}; feature = "conduit_bin", put("/_matrix/client/r0/user/<_>/rooms/<_>/tags/<_>", data = "") )] -pub fn update_tag_route( +pub async fn update_tag_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -40,6 +40,8 @@ pub fn update_tag_route( &db.globals, )?; + db.flush().await?; + Ok(create_tag::Response.into()) } @@ -47,7 +49,7 @@ pub fn update_tag_route( feature = "conduit_bin", delete("/_matrix/client/r0/user/<_>/rooms/<_>/tags/<_>", data = "") )] -pub fn delete_tag_route( +pub async fn delete_tag_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -71,6 +73,8 @@ pub fn delete_tag_route( &db.globals, )?; + db.flush().await?; + Ok(delete_tag::Response.into()) } @@ -78,7 +82,7 @@ pub fn delete_tag_route( feature = "conduit_bin", get("/_matrix/client/r0/user/<_>/rooms/<_>/tags", data = "") )] -pub fn get_tags_route( +pub async fn get_tags_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/thirdparty.rs b/src/client_server/thirdparty.rs index d9b540b..c775e9b 100644 --- a/src/client_server/thirdparty.rs +++ b/src/client_server/thirdparty.rs @@ -10,7 +10,7 @@ use std::collections::BTreeMap; feature = "conduit_bin", get("/_matrix/client/r0/thirdparty/protocols") )] -pub fn get_protocols_route() -> ConduitResult { +pub async fn get_protocols_route() -> ConduitResult { warn!("TODO: get_protocols_route"); Ok(get_protocols::Response { protocols: BTreeMap::new(), diff --git a/src/client_server/to_device.rs b/src/client_server/to_device.rs index 6719dae..8cc3e29 100644 --- a/src/client_server/to_device.rs +++ b/src/client_server/to_device.rs @@ -12,7 +12,7 @@ use rocket::put; feature = "conduit_bin", put("/_matrix/client/r0/sendToDevice/<_>/<_>", data = "") )] -pub fn send_event_to_device_route( +pub async fn send_event_to_device_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -66,5 +66,7 @@ pub fn send_event_to_device_route( db.transaction_ids .add_txnid(sender_user, sender_device, &body.txn_id, &[])?; + db.flush().await?; + Ok(send_event_to_device::Response.into()) } diff --git a/src/client_server/unversioned.rs b/src/client_server/unversioned.rs index ea7f633..e51ed56 100644 --- a/src/client_server/unversioned.rs +++ b/src/client_server/unversioned.rs @@ -15,7 +15,7 @@ use rocket::get; /// Note: Unstable features are used while developing new features. Clients should avoid using /// unstable features in their stable releases #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/versions"))] -pub fn get_supported_versions_route() -> ConduitResult { +pub async fn get_supported_versions_route() -> ConduitResult { let mut resp = get_supported_versions::Response::new(vec!["r0.5.0".to_owned(), "r0.6.0".to_owned()]); diff --git a/src/client_server/user_directory.rs b/src/client_server/user_directory.rs index dcf48fe..5829364 100644 --- a/src/client_server/user_directory.rs +++ b/src/client_server/user_directory.rs @@ -9,7 +9,7 @@ use rocket::post; feature = "conduit_bin", post("/_matrix/client/r0/user_directory/search", data = "") )] -pub fn search_users_route( +pub async fn search_users_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { diff --git a/src/client_server/voip.rs b/src/client_server/voip.rs index 33080ea..a8db62a 100644 --- a/src/client_server/voip.rs +++ b/src/client_server/voip.rs @@ -5,7 +5,7 @@ use ruma::api::client::{error::ErrorKind, r0::message::send_message_event}; use rocket::get; #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/voip/turnServer"))] -pub fn turn_server_route() -> ConduitResult { +pub async fn turn_server_route() -> ConduitResult { Err(Error::BadRequest( ErrorKind::NotFound, "There is no turn server yet.", diff --git a/src/database.rs b/src/database.rs index 4b2cba1..6bdc32a 100644 --- a/src/database.rs +++ b/src/database.rs @@ -227,4 +227,9 @@ impl Database { // Wait until one of them finds something futures.next().await; } + + pub async fn flush(&self) -> Result<()> { + self._db.flush_async().await?; + Ok(()) + } }