From b4f79b77ba37c875c9c9c78cf8a03b1eeda83d64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 13 Apr 2021 15:00:45 +0200 Subject: [PATCH] feat: reject invites over federation --- Cargo.lock | 106 +++++------ Cargo.toml | 6 +- src/appservice_server.rs | 17 +- src/client_server/membership.rs | 35 +--- src/client_server/sync.rs | 95 ++-------- src/database.rs | 5 +- src/database/pusher.rs | 17 +- src/database/rooms.rs | 299 +++++++++++++++++++++++++++++--- src/ruma_wrapper.rs | 14 +- src/server_server.rs | 38 ++-- 10 files changed, 391 insertions(+), 241 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 42042b6..d3da6fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1" +checksum = "a9d5813545e459ad3ca1bff9915e9ad7f1a47dc6a91b627ce321d5863b7dd253" dependencies = [ "futures-channel", "futures-core", @@ -472,9 +472,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939" +checksum = "ce79c6a52a299137a6013061e0cf0e688fce5d7f1bc60125f520912fdb29ec25" dependencies = [ "futures-core", "futures-sink", @@ -482,15 +482,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94" +checksum = "098cd1c6dda6ca01650f1a37a794245eb73181d0d4d4e955e2f3c37db7af1815" [[package]] name = "futures-executor" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1" +checksum = "10f6cb7042eda00f0049b1d2080aa4b93442997ee507eb3828e8bd7577f94c9d" dependencies = [ "futures-core", "futures-task", @@ -499,15 +499,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59" +checksum = "365a1a1fb30ea1c03a830fdb2158f5236833ac81fa0ad12fe35b29cddc35cb04" [[package]] name = "futures-macro" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7" +checksum = "668c6733a182cd7deb4f1de7ba3bf2120823835b3bcfbeacf7d2c4a773c1bb8b" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -517,21 +517,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3" +checksum = "5c5629433c555de3d82861a7a4e3794a4c40040390907cfbfd7143a92a426c23" [[package]] name = "futures-task" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80" +checksum = "ba7aa51095076f3ba6d9a1f702f74bd05ec65f555d70d2033d55ba8d69f581bc" [[package]] name = "futures-util" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1" +checksum = "3c144ad54d60f23927f0a6b6d816e4271278b64f005ad65e4e35291d2de9c025" dependencies = [ "futures-channel", "futures-core", @@ -650,9 +650,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7245cd7449cc792608c3c8a9eaf69bd4eabbabf802713748fd739c98b82f0747" +checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" dependencies = [ "bytes", "fnv", @@ -672,9 +672,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.3.5" +version = "1.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "615caabe2c3160b313d52ccc905335f4ed5f10881dd63dc5699d47e90be85691" +checksum = "bc35c995b9d93ec174cf9a27d425c7892722101e14993cd227fdb51d70cf9589" [[package]] name = "httpdate" @@ -1497,9 +1497,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf12057f289428dbf5c591c74bf10392e4a8003f993405a902f20117019022d4" +checksum = "2296f2fac53979e8ccbc4a1136b25dcefd37be9ed7e4a1f6b05a6029c84ff124" dependencies = [ "base64 0.13.0", "bytes", @@ -1625,7 +1625,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "assign", "js_int", @@ -1645,8 +1645,9 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ + "bytes", "http", "percent-encoding", "ruma-api-macros", @@ -1660,7 +1661,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1671,7 +1672,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "ruma-api", "ruma-common", @@ -1685,9 +1686,10 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "assign", + "bytes", "http", "js_int", "maplit", @@ -1704,7 +1706,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.3.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "indexmap", "js_int", @@ -1720,7 +1722,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "js_int", "ruma-common", @@ -1734,7 +1736,7 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1745,7 +1747,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.1.0-alpha.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "js_int", "ruma-api", @@ -1760,7 +1762,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.18.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "paste", "rand", @@ -1774,7 +1776,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.18.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "proc-macro2", "quote", @@ -1785,12 +1787,12 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.2.2" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" [[package]] name = "ruma-identity-service-api" version = "0.0.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "ruma-api", "ruma-common", @@ -1803,7 +1805,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.0.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "js_int", "ruma-api", @@ -1818,7 +1820,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.3.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "form_urlencoded", "itoa", @@ -1831,7 +1833,7 @@ dependencies = [ [[package]] name = "ruma-serde-macros" version = "0.3.1" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1842,7 +1844,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0" -source = "git+https://github.com/timokoesters/ruma?rev=b11de1e1f9d3c15267d09617131cf217f8277fa4#b11de1e1f9d3c15267d09617131cf217f8277fa4" +source = "git+https://github.com/ruma/ruma?rev=6394609feb4af5c43b840fab85b824b13cebb156#6394609feb4af5c43b840fab85b824b13cebb156" dependencies = [ "base64 0.13.0", "ring", @@ -1910,9 +1912,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "sct" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" dependencies = [ "ring", "untrusted", @@ -2120,7 +2122,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/timokoesters/state-res?rev=2e90b36babeb0d6b99ce8d4b513302a25dcdffc1#2e90b36babeb0d6b99ce8d4b513302a25dcdffc1" +source = "git+https://github.com/timokoesters/state-res?rev=94534b8ff3e71b544ae36206abc182321e9d41f1#94534b8ff3e71b544ae36206abc182321e9d41f1" dependencies = [ "itertools 0.10.0", "log", @@ -2182,9 +2184,9 @@ checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" [[package]] name = "syn" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ce15dd3ed8aa2f8eeac4716d6ef5ab58b6b9256db41d7e1a0224c2788e8fd87" +checksum = "48fe99c6bd8b1cc636890bcc071842de909d902c81ac7dab53ba33c421ab8ffb" dependencies = [ "proc-macro2", "quote", @@ -2330,9 +2332,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134af885d758d645f0f0505c9a8b3f9bf8a348fd822e112ab5248138348f1722" +checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" dependencies = [ "autocfg", "bytes", @@ -2381,9 +2383,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5143d049e85af7fbc36f5454d990e62c2df705b3589f123b71f441b6b59f443f" +checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e" dependencies = [ "bytes", "futures-core", @@ -2558,9 +2560,9 @@ dependencies = [ [[package]] name = "uncased" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "300932469d646d39929ffe84ad5c1837beecf602519ef5695e485b472de4082b" +checksum = "5baeed7327e25054889b9bd4f975f32e5f4c5d434042d59ab6cd4142c0a76ed0" dependencies = [ "version_check", ] diff --git a/Cargo.toml b/Cargo.toml index a28c08d..84e40d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,12 +18,12 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "93e62c86e #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -#ruma = { git = "https://github.com/ruma/ruma", rev = "a310ccc318a4eb51062923d570d5a86c1468e8a1", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "unstable-pre-spec", "unstable-exhaustive-types"] } -ruma = { git = "https://github.com/timokoesters/ruma", rev = "b11de1e1f9d3c15267d09617131cf217f8277fa4", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "unstable-pre-spec", "unstable-exhaustive-types"] } +ruma = { git = "https://github.com/ruma/ruma", rev = "6394609feb4af5c43b840fab85b824b13cebb156", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "unstable-pre-spec", "unstable-exhaustive-types"] } +#ruma = { git = "https://github.com/timokoesters/ruma", rev = "220d5b4a76b3b781f7f8297fbe6b14473b04214b", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { path = "../ruma/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "unstable-pre-spec", "unstable-exhaustive-types"] } # Used when doing state resolution -state-res = { git = "https://github.com/timokoesters/state-res", rev = "2e90b36babeb0d6b99ce8d4b513302a25dcdffc1", features = ["unstable-pre-spec"] } +state-res = { git = "https://github.com/timokoesters/state-res", rev = "94534b8ff3e71b544ae36206abc182321e9d41f1", features = ["unstable-pre-spec"] } #state-res = { path = "../state-res", features = ["unstable-pre-spec"] } # Used for long polling and federation sender, should be the same as rocket::tokio diff --git a/src/appservice_server.rs b/src/appservice_server.rs index 04f14c0..1b72c76 100644 --- a/src/appservice_server.rs +++ b/src/appservice_server.rs @@ -1,7 +1,7 @@ use crate::{utils, Error, Result}; use http::header::{HeaderValue, CONTENT_TYPE}; use log::warn; -use ruma::api::OutgoingRequest; +use ruma::api::{IncomingResponse, OutgoingRequest}; use std::{ convert::{TryFrom, TryInto}, fmt::Debug, @@ -66,15 +66,10 @@ where let status = reqwest_response.status(); - let body = reqwest_response - .bytes() - .await - .unwrap_or_else(|e| { - warn!("server error: {}", e); - Vec::new().into() - }) // TODO: handle timeout - .into_iter() - .collect::>(); + let body = reqwest_response.bytes().await.unwrap_or_else(|e| { + warn!("server error: {}", e); + Vec::new().into() + }); // TODO: handle timeout if status != 200 { warn!( @@ -86,7 +81,7 @@ where ); } - let response = T::IncomingResponse::try_from( + let response = T::IncomingResponse::try_from_http_response( http_response .body(body) .expect("reqwest body is valid http body"), diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index e816005..d491ca0 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -91,37 +91,7 @@ pub async fn leave_room_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let mut event = serde_json::from_value::>( - db.rooms - .room_state_get( - &body.room_id, - &EventType::RoomMember, - &sender_user.to_string(), - )? - .ok_or(Error::BadRequest( - ErrorKind::BadState, - "Cannot leave a room you are not a member of.", - ))? - .content, - ) - .expect("from_value::> can never fail") - .deserialize() - .map_err(|_| Error::bad_database("Invalid member event in database."))?; - - event.membership = member::MembershipState::Leave; - - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_user.to_string()), - redacts: None, - }, - &sender_user, - &body.room_id, - &db, - )?; + db.rooms.leave_room(sender_user, &body.room_id, &db).await?; db.flush().await?; @@ -480,6 +450,7 @@ async fn join_room_by_id_helper( Error::BadServerResponse("Invalid make_join event json received from server.") })?; + // TODO: Is origin needed? join_event_stub.insert( "origin".to_owned(), to_canonical_value(db.globals.server_name()) @@ -699,5 +670,7 @@ async fn join_room_by_id_helper( )?; } + db.flush().await?; + Ok(join_room_by_id::Response::new(room_id.clone()).into()) } diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index fe14208..66a1e13 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -1,6 +1,5 @@ use super::State; use crate::{ConduitResult, Database, Error, Ruma}; -use log::error; use ruma::{ api::client::r0::sync::sync_events, events::{room::member::MembershipState, AnySyncEphemeralRoomEvent, EventType}, @@ -494,83 +493,17 @@ pub async fn sync_events_route( } let mut left_rooms = BTreeMap::new(); - for room_id in db.rooms.rooms_left(&sender_user) { - let room_id = room_id?; + for result in db.rooms.rooms_left(&sender_user) { + let (room_id, left_state_events) = result?; + let left_count = db.rooms.get_left_count(&room_id, &sender_user)?; - let since_member = if let Some(since_member) = db - .rooms - .pdus_after(sender_user, &room_id, since) - .next() - .and_then(|pdu| pdu.ok()) - .and_then(|pdu| { - db.rooms - .pdu_shortstatehash(&pdu.1.event_id) - .ok()? - .ok_or_else(|| { - error!("{:?}", pdu.1); - Error::bad_database("Pdu in db doesn't have a state hash.") - }) - .ok() - }) - .and_then(|shortstatehash| { - db.rooms - .state_get(shortstatehash, &EventType::RoomMember, sender_user.as_str()) - .ok()? - .ok_or_else(|| Error::bad_database("State hash in db doesn't have a state.")) - .ok() - }) - .and_then(|pdu| { - serde_json::from_value::>( - pdu.content.clone(), - ) - .expect("Raw::from_value always works") - .deserialize() - .map_err(|_| Error::bad_database("Invalid PDU in database.")) - .map(|content| (pdu, content)) - .ok() - }) { - since_member - } else { - // We couldn't find the since_member event. This is very weird - we better abort + // Left before last sync + if Some(since) >= left_count { continue; - }; + } - let left_since_last_sync = since_member.1.membership == MembershipState::Join; - - let left_room = if left_since_last_sync { - device_list_left.extend( - db.rooms - .room_members(&room_id) - .filter_map(|user_id| Some(user_id.ok()?)) - .filter(|user_id| { - // Don't send key updates from the sender to the sender - sender_user != user_id - }) - .filter(|user_id| { - // Only send if the sender doesn't share any encrypted room with the target - // anymore - !share_encrypted_room(&db, sender_user, user_id, &room_id) - }), - ); - - let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?; - let mut room_events = pdus - .filter_map(|pdu| pdu.ok()) // Filter out buggy events - .take_while(|(_, pdu)| &since_member.0 != pdu) - .map(|(_, pdu)| pdu.to_sync_room_event()) - .collect::>(); - room_events.push(since_member.0.to_sync_room_event()); - - sync_events::LeftRoom { - account_data: sync_events::AccountData { events: Vec::new() }, - timeline: sync_events::Timeline { - limited: false, - prev_batch: Some(next_batch.clone()), - events: room_events, - }, - state: sync_events::State { events: Vec::new() }, - } - } else { + left_rooms.insert( + room_id.clone(), sync_events::LeftRoom { account_data: sync_events::AccountData { events: Vec::new() }, timeline: sync_events::Timeline { @@ -578,13 +511,11 @@ pub async fn sync_events_route( prev_batch: Some(next_batch.clone()), events: Vec::new(), }, - state: sync_events::State { events: Vec::new() }, - } - }; - - if !left_room.is_empty() { - left_rooms.insert(room_id.clone(), left_room); - } + state: sync_events::State { + events: left_state_events, + }, + }, + ); } let mut invited_rooms = BTreeMap::new(); diff --git a/src/database.rs b/src/database.rs index 9d629dd..6bb1b17 100644 --- a/src/database.rs +++ b/src/database.rs @@ -163,7 +163,8 @@ impl Database { roomuseroncejoinedids: db.open_tree("roomuseroncejoinedids")?, userroomid_invitestate: db.open_tree("userroomid_invitestate")?, roomuserid_invitecount: db.open_tree("roomuserid_invitecount")?, - userroomid_left: db.open_tree("userroomid_left")?, + userroomid_leftstate: db.open_tree("userroomid_leftstate")?, + roomuserid_leftcount: db.open_tree("roomuserid_leftcount")?, userroomid_notificationcount: db.open_tree("userroomid_notificationcount")?, userroomid_highlightcount: db.open_tree("userroomid_highlightcount")?, @@ -244,7 +245,7 @@ impl Database { .userroomid_invitestate .watch_prefix(&userid_prefix), ); - futures.push(self.rooms.userroomid_left.watch_prefix(&userid_prefix)); + futures.push(self.rooms.userroomid_leftstate.watch_prefix(&userid_prefix)); // Events for rooms we are in for room_id in self.rooms.rooms_joined(user_id).filter_map(|r| r.ok()) { diff --git a/src/database/pusher.rs b/src/database/pusher.rs index e2bd3f1..be30576 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -7,7 +7,7 @@ use ruma::{ self, v1::{Device, Notification, NotificationCounts, NotificationPriority}, }, - OutgoingRequest, + IncomingResponse, OutgoingRequest, }, events::{room::power_levels::PowerLevelsEventContent, EventType}, push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak}, @@ -129,15 +129,10 @@ where let status = reqwest_response.status(); - let body = reqwest_response - .bytes() - .await - .unwrap_or_else(|e| { - warn!("server error {}", e); - Vec::new().into() - }) // TODO: handle timeout - .into_iter() - .collect::>(); + let body = reqwest_response.bytes().await.unwrap_or_else(|e| { + warn!("server error {}", e); + Vec::new().into() + }); // TODO: handle timeout if status != 200 { info!( @@ -149,7 +144,7 @@ where ); } - let response = T::IncomingResponse::try_from( + let response = T::IncomingResponse::try_from_http_response( http_response .body(body) .expect("reqwest body is valid http body"), diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 3f37de6..caf7a09 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1,17 +1,18 @@ mod edus; pub use edus::RoomEdus; +use member::MembershipState; use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; use log::{debug, error, warn}; use regex::Regex; use ring::digest; use ruma::{ - api::client::error::ErrorKind, + api::{client::error::ErrorKind, federation}, events::{ ignored_user_list, push_rules, room::{create::CreateEventContent, member, message}, - AnyStrippedStateEvent, EventType, + AnyStrippedStateEvent, AnySyncStateEvent, EventType, }, push::{self, Action, Tweak}, serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw}, @@ -54,7 +55,8 @@ pub struct Rooms { pub(super) roomuseroncejoinedids: sled::Tree, pub(super) userroomid_invitestate: sled::Tree, // InviteState = Vec> pub(super) roomuserid_invitecount: sled::Tree, // InviteCount = Count - pub(super) userroomid_left: sled::Tree, + pub(super) userroomid_leftstate: sled::Tree, + pub(super) roomuserid_leftcount: sled::Tree, pub(super) userroomid_notificationcount: sled::Tree, // NotifyCount = u64 pub(super) userroomid_highlightcount: sled::Tree, // HightlightCount = u64 @@ -671,7 +673,7 @@ impl Rooms { .users .iter() .filter_map(|r| r.ok()) - .filter(|user_id| db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false)) + .filter(|user_id| self.is_joined(&user_id, &pdu.room_id).unwrap_or(false)) { // Don't notify the user of their own events if user == pdu.sender { @@ -782,9 +784,11 @@ impl Rooms { { state.push(e.to_stripped_state_event()); } - if let Some(e) = - self.room_state_get(&pdu.room_id, &EventType::RoomMember, pdu.sender.as_str())? - { + if let Some(e) = self.room_state_get( + &pdu.room_id, + &EventType::RoomMember, + pdu.sender.as_str(), + )? { state.push(e.to_stripped_state_event()); } @@ -1380,7 +1384,7 @@ impl Rooms { .state_key .as_ref() .map_or(false, |state_key| users.is_match(&state_key)) - || db.rooms.room_members(&room_id).any(|userid| { + || self.room_members(&room_id).any(|userid| { userid.map_or(false, |userid| users.is_match(userid.as_str())) }) }; @@ -1537,7 +1541,7 @@ impl Rooms { user_id: &UserId, membership: member::MembershipState, sender: &UserId, - invite_state: Option>>, + last_state: Option>>, account_data: &super::account_data::AccountData, globals: &super::globals::Globals, ) -> Result<()> { @@ -1643,7 +1647,8 @@ impl Rooms { self.roomuserid_joined.insert(&roomuser_id, &[])?; self.userroomid_invitestate.remove(&userroom_id)?; self.roomuserid_invitecount.remove(&roomuser_id)?; - self.userroomid_left.remove(&userroom_id)?; + self.userroomid_leftstate.remove(&userroom_id)?; + self.roomuserid_leftcount.remove(&roomuser_id)?; } member::MembershipState::Invite => { // We want to know if the sender is ignored by the receiver @@ -1664,14 +1669,15 @@ impl Rooms { self.roomserverids.insert(&roomserver_id, &[])?; self.userroomid_invitestate.insert( &userroom_id, - serde_json::to_vec(&invite_state.unwrap_or_default()) + serde_json::to_vec(&last_state.unwrap_or_default()) .expect("state to bytes always works"), )?; self.roomuserid_invitecount .insert(&roomuser_id, &globals.next_count()?.to_be_bytes())?; self.userroomid_joined.remove(&userroom_id)?; self.roomuserid_joined.remove(&roomuser_id)?; - self.userroomid_left.remove(&userroom_id)?; + self.userroomid_leftstate.remove(&userroom_id)?; + self.roomuserid_leftcount.remove(&roomuser_id)?; } member::MembershipState::Leave | member::MembershipState::Ban => { if self @@ -1682,7 +1688,12 @@ impl Rooms { { self.roomserverids.remove(&roomserver_id)?; } - self.userroomid_left.insert(&userroom_id, &[])?; + self.userroomid_leftstate.insert( + &userroom_id, + serde_json::to_vec(&Vec::>::new()).unwrap(), + )?; // TODO + self.roomuserid_leftcount + .insert(&roomuser_id, &globals.next_count()?.to_be_bytes())?; self.userroomid_joined.remove(&userroom_id)?; self.roomuserid_joined.remove(&roomuser_id)?; self.userroomid_invitestate.remove(&userroom_id)?; @@ -1694,13 +1705,191 @@ impl Rooms { Ok(()) } + pub async fn leave_room( + &self, + user_id: &UserId, + room_id: &RoomId, + db: &Database, + ) -> Result<()> { + // Ask a remote server if we don't have this room + if !self.exists(room_id)? && room_id.server_name() != db.globals.server_name() { + if let Err(e) = self.remote_leave_room(user_id, room_id, db).await { + warn!("Failed to leave room {} remotely: {}", user_id, e); + // Don't tell the client about this error + } + + let last_state = self + .invite_state(user_id, room_id)? + .map_or_else(|| self.left_state(user_id, room_id), |s| Ok(Some(s)))?; + + // We always drop the invite, we can't rely on other servers + self.update_membership( + room_id, + user_id, + MembershipState::Leave, + user_id, + last_state, + &db.account_data, + &db.globals, + )?; + } else { + let mut event = serde_json::from_value::>( + self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())? + .ok_or(Error::BadRequest( + ErrorKind::BadState, + "Cannot leave a room you are not a member of.", + ))? + .content, + ) + .expect("from_value::> can never fail") + .deserialize() + .map_err(|_| Error::bad_database("Invalid member event in database."))?; + + event.membership = member::MembershipState::Leave; + + self.build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(event) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + user_id, + room_id, + db, + )?; + } + + Ok(()) + } + + async fn remote_leave_room( + &self, + user_id: &UserId, + room_id: &RoomId, + db: &Database, + ) -> Result<()> { + let mut make_leave_response_and_server = Err(Error::BadServerResponse( + "No server available to assist in leaving.", + )); + + let invite_state = db + .rooms + .invite_state(user_id, room_id)? + .ok_or(Error::BadRequest( + ErrorKind::BadState, + "User is not invited.", + ))?; + + let servers = invite_state + .iter() + .filter_map(|event| { + serde_json::from_str::(&event.json().to_string()).ok() + }) + .filter_map(|event| event.get("sender").cloned()) + .filter_map(|sender| sender.as_str().map(|s| s.to_owned())) + .filter_map(|sender| UserId::try_from(sender).ok()) + .map(|user| user.server_name().to_owned()); + + for remote_server in servers { + let make_leave_response = db + .sending + .send_federation_request( + &db.globals, + &remote_server, + federation::membership::get_leave_event::v1::Request { room_id, user_id }, + ) + .await; + + make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server)); + + if make_leave_response_and_server.is_ok() { + break; + } + } + + let (make_leave_response, remote_server) = make_leave_response_and_server?; + + let room_version = match make_leave_response.room_version { + Some(room_version) if room_version == RoomVersionId::Version6 => room_version, + _ => return Err(Error::BadServerResponse("Room version is not supported")), + }; + + let mut leave_event_stub = + serde_json::from_str::(make_leave_response.event.json().get()) + .map_err(|_| { + Error::BadServerResponse("Invalid make_leave event json received from server.") + })?; + + // TODO: Is origin needed? + leave_event_stub.insert( + "origin".to_owned(), + to_canonical_value(db.globals.server_name()) + .map_err(|_| Error::bad_database("Invalid server name found"))?, + ); + leave_event_stub.insert( + "origin_server_ts".to_owned(), + to_canonical_value(utils::millis_since_unix_epoch()) + .expect("Timestamp is valid js_int value"), + ); + // We don't leave the event id in the pdu because that's only allowed in v1 or v2 rooms + leave_event_stub.remove("event_id"); + + // In order to create a compatible ref hash (EventID) the `hashes` field needs to be present + ruma::signatures::hash_and_sign_event( + db.globals.server_name().as_str(), + db.globals.keypair(), + &mut leave_event_stub, + &room_version, + ) + .expect("event is valid, we just created it"); + + // Generate event id + let event_id = EventId::try_from(&*format!( + "${}", + ruma::signatures::reference_hash(&leave_event_stub, &room_version) + .expect("ruma can calculate reference hashes") + )) + .expect("ruma's reference hashes are valid event ids"); + + // Add event_id back + leave_event_stub.insert( + "event_id".to_owned(), + to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"), + ); + + // It has enough fields to be called a proper event now + let leave_event = leave_event_stub; + + db.sending + .send_federation_request( + &db.globals, + &remote_server, + federation::membership::create_leave_event::v2::Request { + room_id, + event_id: &event_id, + pdu: PduEvent::convert_to_outgoing_federation_event(leave_event.clone()), + }, + ) + .await?; + + Ok(()) + } + /// Makes a user forget a room. pub fn forget(&self, room_id: &RoomId, user_id: &UserId) -> Result<()> { let mut userroom_id = user_id.as_bytes().to_vec(); userroom_id.push(0xff); userroom_id.extend_from_slice(room_id.as_bytes()); - self.userroomid_left.remove(userroom_id)?; + let mut roomuser_id = room_id.as_bytes().to_vec(); + roomuser_id.push(0xff); + roomuser_id.extend_from_slice(user_id.as_bytes()); + + self.userroomid_leftstate.remove(userroom_id)?; + self.roomuserid_leftcount.remove(roomuser_id)?; Ok(()) } @@ -1977,7 +2166,6 @@ impl Rooms { }) } - /// Returns an iterator over all invited members of a room. #[tracing::instrument(skip(self))] pub fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { let mut key = room_id.as_bytes().to_vec(); @@ -1993,6 +2181,21 @@ impl Rooms { }) } + #[tracing::instrument(skip(self))] + pub fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result> { + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(user_id.as_bytes()); + + self.roomuserid_leftcount + .get(key)? + .map_or(Ok(None), |bytes| { + Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| { + Error::bad_database("Invalid leftcount in db.") + })?)) + }) + } + /// Returns an iterator over all rooms this user joined. #[tracing::instrument(skip(self))] pub fn rooms_joined(&self, user_id: &UserId) -> impl Iterator> { @@ -2045,25 +2248,75 @@ impl Rooms { }) } + #[tracing::instrument(skip(self))] + pub fn invite_state( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result>>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&room_id.as_bytes()); + + self.userroomid_invitestate + .get(key)? + .map(|state| { + let state = serde_json::from_slice(&state) + .map_err(|_| Error::bad_database("Invalid state in userroomid_invitestate."))?; + + Ok(state) + }) + .transpose() + } + + #[tracing::instrument(skip(self))] + pub fn left_state( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result>>> { + let mut key = user_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(&room_id.as_bytes()); + + self.userroomid_leftstate + .get(key)? + .map(|state| { + let state = serde_json::from_slice(&state) + .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; + + Ok(state) + }) + .transpose() + } + /// Returns an iterator over all rooms a user left. #[tracing::instrument(skip(self))] - pub fn rooms_left(&self, user_id: &UserId) -> impl Iterator> { + pub fn rooms_left( + &self, + user_id: &UserId, + ) -> impl Iterator>)>> { let mut prefix = user_id.as_bytes().to_vec(); prefix.push(0xff); - self.userroomid_left.scan_prefix(prefix).keys().map(|key| { - Ok(RoomId::try_from( + self.userroomid_leftstate.scan_prefix(prefix).map(|r| { + let (key, state) = r?; + let room_id = RoomId::try_from( utils::string_from_bytes( - &key? - .rsplit(|&b| b == 0xff) + &key.rsplit(|&b| b == 0xff) .next() .expect("rsplit always returns an element"), ) .map_err(|_| { - Error::bad_database("Room ID in userroomid_left is invalid unicode.") + Error::bad_database("Room ID in userroomid_invited is invalid unicode.") })?, ) - .map_err(|_| Error::bad_database("Room ID in userroomid_left is invalid."))?) + .map_err(|_| Error::bad_database("Room ID in userroomid_invited is invalid."))?; + + let state = serde_json::from_slice(&state) + .map_err(|_| Error::bad_database("Invalid state in userroomid_leftstate."))?; + + Ok((room_id, state)) }) } @@ -2096,6 +2349,6 @@ impl Rooms { userroom_id.push(0xff); userroom_id.extend_from_slice(room_id.as_bytes()); - Ok(self.userroomid_left.get(userroom_id)?.is_some()) + Ok(self.userroomid_leftstate.get(userroom_id)?.is_some()) } } diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index 8e1d34f..c60c04e 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -1,9 +1,10 @@ use crate::Error; use ruma::{ + api::OutgoingResponse, identifiers::{DeviceId, UserId}, Outgoing, }; -use std::{convert::TryInto, ops::Deref}; +use std::ops::Deref; #[cfg(feature = "conduit_bin")] use { @@ -145,7 +146,7 @@ where let mut body = Vec::new(); handle.read_to_end(&mut body).await.unwrap(); - let http_request = http_request.body(body.clone()).unwrap(); + let http_request = http_request.body(&*body).unwrap(); debug!("{:?}", http_request); match ::try_from_http_request(http_request) { Ok(t) => Success(Ruma { @@ -178,9 +179,9 @@ impl Deref for Ruma { /// This struct converts ruma responses into rocket http responses. pub type ConduitResult = std::result::Result, Error>; -pub struct RumaResponse>>>(pub T); +pub struct RumaResponse(pub T); -impl>>> From for RumaResponse { +impl From for RumaResponse { fn from(t: T) -> Self { Self(t) } @@ -189,12 +190,11 @@ impl>>> From for RumaResponse { #[cfg(feature = "conduit_bin")] impl<'r, 'o, T> Responder<'r, 'o> for RumaResponse where - T: Send + TryInto>>, - T::Error: Send, + T: Send + OutgoingResponse, 'o: 'r, { fn respond_to(self, _: &'r Request<'_>) -> response::Result<'o> { - let http_response: Result, _> = self.0.try_into(); + let http_response: Result, _> = self.0.try_into_http_response(); match http_response { Ok(http_response) => { let mut response = rocket::response::Response::build(); diff --git a/src/server_server.rs b/src/server_server.rs index 1fad54e..304bc19 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -18,7 +18,7 @@ use ruma::{ query::get_profile_information, transactions::send_transaction_message, }, - OutgoingRequest, + IncomingResponse, OutgoingRequest, OutgoingResponse, }, directory::{IncomingFilter, IncomingRoomNetwork}, events::{ @@ -173,15 +173,10 @@ where let status = reqwest_response.status(); - let body = reqwest_response - .bytes() - .await - .unwrap_or_else(|e| { - warn!("server error {}", e); - Vec::new().into() - }) // TODO: handle timeout - .into_iter() - .collect::>(); + let body = reqwest_response.bytes().await.unwrap_or_else(|e| { + warn!("server error {}", e); + Vec::new().into() + }); // TODO: handle timeout if status != 200 { info!( @@ -195,7 +190,7 @@ where ); } - let response = T::IncomingResponse::try_from( + let response = T::IncomingResponse::try_from_http_response( http_response .body(body) .expect("reqwest body is valid http body"), @@ -350,6 +345,7 @@ pub fn get_server_version_route( .into()) } +// Response type for this endpoint is Json because we need to calculate a signature for the response #[cfg_attr(feature = "conduit_bin", get("/_matrix/key/v2/server"))] #[tracing::instrument(skip(db))] pub fn get_server_keys_route(db: State<'_, Database>) -> Json { @@ -369,7 +365,7 @@ pub fn get_server_keys_route(db: State<'_, Database>) -> Json { }, ); let mut response = serde_json::from_slice( - http::Response::try_from(get_server_keys::v2::Response { + get_server_keys::v2::Response { server_key: ServerSigningKeys { server_name: db.globals.server_name().to_owned(), verify_keys, @@ -377,7 +373,8 @@ pub fn get_server_keys_route(db: State<'_, Database>) -> Json { signatures: BTreeMap::new(), valid_until_ts: SystemTime::now() + Duration::from_secs(60 * 2), }, - }) + } + .try_into_http_response() .unwrap() .body(), ) @@ -745,7 +742,7 @@ fn handle_incoming_pdu<'a>( // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" - debug!("Fetching auth events."); + debug!("Fetching auth events for {}", incoming_pdu.event_id); fetch_and_handle_events( db, origin, @@ -757,7 +754,10 @@ fn handle_incoming_pdu<'a>( .map_err(|e| e.to_string())?; // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events - debug!("Checking auth."); + debug!( + "Auth check for {} based on auth events", + incoming_pdu.event_id + ); // Build map of auth events let mut auth_events = BTreeMap::new(); @@ -1151,7 +1151,7 @@ pub(crate) async fn fetch_and_handle_events( // a. Look at auth cache let pdu = match auth_cache.get(id) { Some(pdu) => { - debug!("Event found in cache"); + debug!("Found {} in cache", id); pdu.clone() } // b. Look in the main timeline (pduid_pdu tree) @@ -1159,12 +1159,12 @@ pub(crate) async fn fetch_and_handle_events( // (get_pdu checks both) None => match db.rooms.get_pdu(&id)? { Some(pdu) => { - debug!("Event found in outliers"); + debug!("Found {} in outliers", id); Arc::new(pdu) } None => { // d. Ask origin server over federation - debug!("Fetching event over federation: {:?}", id); + debug!("Fetching {} over federation.", id); match db .sending .send_federation_request( @@ -1175,7 +1175,7 @@ pub(crate) async fn fetch_and_handle_events( .await { Ok(res) => { - debug!("Got event over federation: {:?}", res); + debug!("Got {} over federation: {:?}", id, res); let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu)?; let pdu = handle_incoming_pdu(