From 6e5b35ea92075cdc9fc62db0f7f946ae6b80d76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 8 Dec 2020 10:33:44 +0100 Subject: [PATCH] feat: implement appservices this also reverts some stateres changes --- Cargo.lock | 247 ++++++++------------------------ Cargo.toml | 8 +- src/appservice_server.rs | 104 ++++++++++++++ src/client_server/account.rs | 35 +++-- src/client_server/alias.rs | 41 ++++-- src/client_server/media.rs | 2 +- src/client_server/membership.rs | 6 + src/client_server/message.rs | 3 +- src/client_server/profile.rs | 2 + src/client_server/redact.rs | 1 + src/client_server/room.rs | 15 ++ src/client_server/state.rs | 3 +- src/client_server/to_device.rs | 2 +- src/database.rs | 8 ++ src/database/admin.rs | 61 +++++--- src/database/appservice.rs | 67 +++++++++ src/database/globals.rs | 12 +- src/database/media.rs | 7 +- src/database/rooms.rs | 122 +++++++--------- src/database/sending.rs | 240 +++++++++++++++++++++---------- src/database/transaction_ids.rs | 8 +- src/error.rs | 4 +- src/lib.rs | 1 + src/main.rs | 4 +- src/ruma_wrapper.rs | 88 +++++++++--- src/server_server.rs | 189 +++--------------------- 26 files changed, 696 insertions(+), 584 deletions(-) create mode 100644 src/appservice_server.rs create mode 100644 src/database/appservice.rs diff --git a/Cargo.lock b/Cargo.lock index 5062b8c..6566b10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,15 +21,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi 0.3.9", -] - [[package]] name = "arrayref" version = "0.3.6" @@ -182,19 +173,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" -dependencies = [ - "libc", - "num-integer", - "num-traits", - "time 0.1.44", - "winapi 0.3.9", -] - [[package]] name = "color_quant" version = "1.1.0" @@ -219,6 +197,7 @@ dependencies = [ "rust-argon2", "serde", "serde_json", + "serde_yaml", "sled", "state-res", "thiserror", @@ -254,7 +233,7 @@ version = "0.15.0-dev" source = "git+https://github.com/SergioBenitez/cookie-rs.git?rev=1c3ca83#1c3ca838543b60a4448d279dc4b903cc7a2bc22a" dependencies = [ "percent-encoding", - "time 0.2.23", + "time", "version_check", ] @@ -374,6 +353,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "dtoa" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" + [[package]] name = "either" version = "1.6.1" @@ -575,19 +560,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "generator" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc" -dependencies = [ - "cc", - "libc", - "log", - "rustc_version", - "winapi 0.3.9", -] - [[package]] name = "getrandom" version = "0.1.15" @@ -596,7 +568,7 @@ checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" dependencies = [ "cfg-if 0.1.10", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -886,9 +858,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "linked-hash-map" @@ -914,19 +886,6 @@ dependencies = [ "cfg-if 0.1.10", ] -[[package]] -name = "loom" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" -dependencies = [ - "cfg-if 0.1.10", - "generator", - "scoped-tls", - "serde", - "serde_json", -] - [[package]] name = "lru-cache" version = "0.1.2" @@ -948,15 +907,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - [[package]] name = "matches" version = "0.1.8" @@ -1468,31 +1418,6 @@ dependencies = [ "syn", ] -[[package]] -name = "regex" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" -dependencies = [ - "regex-syntax", -] - -[[package]] -name = "regex-automata" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" -dependencies = [ - "byteorder", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" - [[package]] name = "remove_dir_all" version = "0.5.3" @@ -1585,7 +1510,7 @@ dependencies = [ "rocket_http", "serde", "state", - "time 0.2.23", + "time", "tokio", "ubyte", "version_check", @@ -1622,7 +1547,7 @@ dependencies = [ "ref-cast", "smallvec", "state", - "time 0.2.23", + "time", "tokio", "tokio-rustls", "uncased", @@ -1633,7 +1558,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "js_int", @@ -1651,7 +1576,7 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "http", "percent-encoding", @@ -1666,7 +1591,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1677,7 +1602,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "ruma-api", "ruma-common", @@ -1691,7 +1616,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "assign", "http", @@ -1710,7 +1635,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "maplit", @@ -1723,7 +1648,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-common", @@ -1737,7 +1662,7 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1748,7 +1673,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.0.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "js_int", "ruma-api", @@ -1763,7 +1688,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "paste", "rand", @@ -1777,7 +1702,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.17.4" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro2", "quote", @@ -1788,7 +1713,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.1.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "serde", ] @@ -1796,7 +1721,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.2.3" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "form_urlencoded", "itoa", @@ -1809,7 +1734,7 @@ dependencies = [ [[package]] name = "ruma-serde-macros" version = "0.2.0" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1820,7 +1745,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0-dev.1" -source = "git+https://github.com/ruma/ruma?rev=e8882fe8142d7b55ed4c8ccc6150946945f9e237#e8882fe8142d7b55ed4c8ccc6150946945f9e237" +source = "git+https://github.com/ruma/ruma?rev=ee814aa84934530d76f5e4b275d739805b49bdef#ee814aa84934530d76f5e4b275d739805b49bdef" dependencies = [ "base64 0.12.3", "ring", @@ -1948,18 +1873,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" +checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" +checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df" dependencies = [ "proc-macro2", "quote", @@ -1989,22 +1914,24 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7baae0a99f1a324984bcdc5f0718384c1f69775f1c7eec8b859b71b443e3fd7" +dependencies = [ + "dtoa", + "linked-hash-map", + "serde", + "yaml-rust", +] + [[package]] name = "sha1" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" -[[package]] -name = "sharded-slab" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127" -dependencies = [ - "lazy_static", - "loom", -] - [[package]] name = "signal-hook-registry" version = "1.2.2" @@ -2078,17 +2005,15 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#99214e6fa6b9843b0d9e1f6ef0698d7fdb234fb2" +source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#a1c15253f0777baad251da47c3f2c016cfed6f7e" dependencies = [ "itertools", - "js_int", "maplit", "ruma", "serde", "serde_json", "thiserror", "tracing", - "tracing-subscriber", ] [[package]] @@ -2163,9 +2088,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.53" +version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8833e20724c24de12bbaba5ad230ea61c3eafb05b881c7c9d3cfe8638b187e68" +checksum = "9a2af957a63d6bd42255c359c93d9bfdb97076bd3b820897ce55ffbfbf107f44" dependencies = [ "proc-macro2", "quote", @@ -2206,26 +2131,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "time" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi 0.3.9", -] - [[package]] name = "time" version = "0.2.23" @@ -2407,49 +2312,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" -dependencies = [ - "serde", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1fa8f0c8f4c594e4fc9debc1990deab13238077271ba84dd853d54902ee3401" -dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers", - "regex", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", - "tracing-serde", -] - [[package]] name = "trust-dns-proto" version = "0.19.6" @@ -2599,12 +2461,6 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasm-bindgen" version = "0.2.69" @@ -2791,6 +2647,15 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "yaml-rust" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39f0c922f1a334134dc2f7a8b67dc5d25f0735263feec974345ff706bcf20b0d" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "yansi" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index b1dec17..1e4afe2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,14 +18,14 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "1f1f44f33 #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "e8882fe8142d7b55ed4c8ccc6150946945f9e237" } +ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "ee814aa84934530d76f5e4b275d739805b49bdef" } # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "unstable-join" } # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } # Used when doing state resolution -# state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] } +# state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] } -# state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } +#state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio tokio = { version = "0.2.23" } @@ -41,6 +41,8 @@ directories = "3.0.1" js_int = "0.1.9" # Used for ruma wrapper serde_json = { version = "1.0.60", features = ["raw_value"] } +# Used for appservice registration files +serde_yaml = "0.8.14" # Used for pdu definition serde = "1.0.117" # Used for secure identifiers diff --git a/src/appservice_server.rs b/src/appservice_server.rs new file mode 100644 index 0000000..f1436e0 --- /dev/null +++ b/src/appservice_server.rs @@ -0,0 +1,104 @@ +use crate::{utils, Error, Result}; +use http::header::{HeaderValue, CONTENT_TYPE}; +use log::warn; +use ruma::api::OutgoingRequest; +use std::{ + convert::{TryFrom, TryInto}, + fmt::Debug, + time::Duration, +}; + +pub async fn send_request( + globals: &crate::database::globals::Globals, + registration: serde_yaml::Value, + request: T, +) -> Result +where + T: Debug, +{ + let destination = registration.get("url").unwrap().as_str().unwrap(); + let hs_token = registration.get("hs_token").unwrap().as_str().unwrap(); + + let mut http_request = request + .try_into_http_request(&destination, Some("")) + .unwrap(); + + let mut parts = http_request.uri().clone().into_parts(); + let old_path_and_query = parts.path_and_query.unwrap().as_str().to_owned(); + let symbol = if old_path_and_query.contains("?") { + "&" + } else { + "?" + }; + + parts.path_and_query = Some( + (old_path_and_query + symbol + "access_token=" + hs_token) + .parse() + .unwrap(), + ); + *http_request.uri_mut() = parts.try_into().expect("our manipulation is always valid"); + + http_request.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str("application/json").unwrap(), + ); + + let mut reqwest_request = reqwest::Request::try_from(http_request) + .expect("all http requests are valid reqwest requests"); + + *reqwest_request.timeout_mut() = Some(Duration::from_secs(30)); + + let url = reqwest_request.url().clone(); + let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; + + // Because reqwest::Response -> http::Response is complicated: + match reqwest_response { + Ok(mut reqwest_response) => { + let status = reqwest_response.status(); + let mut http_response = http::Response::builder().status(status); + let headers = http_response.headers_mut().unwrap(); + + for (k, v) in reqwest_response.headers_mut().drain() { + if let Some(key) = k { + headers.insert(key, v); + } + } + + let status = reqwest_response.status(); + + let body = reqwest_response + .bytes() + .await + .unwrap_or_else(|e| { + warn!("server error: {}", e); + Vec::new().into() + }) // TODO: handle timeout + .into_iter() + .collect::>(); + + if status != 200 { + warn!( + "Server returned bad response {} ({}): {} {:?}", + destination, + url, + status, + utils::string_from_bytes(&body) + ); + } + + let response = T::IncomingResponse::try_from( + http_response + .body(body) + .expect("reqwest body is valid http body"), + ); + response.map_err(|_| { + warn!( + "Server returned invalid response bytes {} ({})", + destination, url + ); + Error::BadServerResponse("Server returned bad response.") + }) + } + Err(e) => Err(e.into()), + } +} diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 76bbebb..8fb926e 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -139,18 +139,20 @@ pub async fn register_route( auth_error: None, }; - if let Some(auth) = &body.auth { - let (worked, uiaainfo) = - db.uiaa - .try_auth(&user_id, "".into(), auth, &uiaainfo, &db.users, &db.globals)?; - if !worked { + if !body.from_appservice { + if let Some(auth) = &body.auth { + let (worked, uiaainfo) = + db.uiaa + .try_auth(&user_id, "".into(), auth, &uiaainfo, &db.users, &db.globals)?; + if !worked { + return Err(Error::Uiaa(uiaainfo)); + } + // Success! + } else { + uiaainfo.session = Some(utils::random_string(SESSION_ID_LENGTH)); + db.uiaa.create(&user_id, "".into(), &uiaainfo)?; return Err(Error::Uiaa(uiaainfo)); } - // Success! - } else { - uiaainfo.session = Some(utils::random_string(SESSION_ID_LENGTH)); - db.uiaa.create(&user_id, "".into(), &uiaainfo)?; - return Err(Error::Uiaa(uiaainfo)); } if missing_username { @@ -241,6 +243,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 2. Make conduit bot join @@ -265,6 +268,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 3. Power levels @@ -302,6 +306,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 4.1 Join Rules @@ -322,6 +327,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 4.2 History Visibility @@ -344,6 +350,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 4.3 Guest Access @@ -364,6 +371,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 6. Events implied by name and topic @@ -386,6 +394,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.rooms.build_and_append_pdu( @@ -405,6 +414,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // Room alias @@ -430,6 +440,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; @@ -456,6 +467,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.rooms.build_and_append_pdu( PduBuilder { @@ -478,6 +490,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // Send welcome message @@ -506,6 +519,7 @@ pub async fn register_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } @@ -681,6 +695,7 @@ pub async fn deactivate_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } diff --git a/src/client_server/alias.rs b/src/client_server/alias.rs index 094e70a..ec73ffc 100644 --- a/src/client_server/alias.rs +++ b/src/client_server/alias.rs @@ -1,7 +1,8 @@ use super::State; -use crate::{server_server, ConduitResult, Database, Error, Ruma}; +use crate::{appservice_server, server_server, ConduitResult, Database, Error, Ruma}; use ruma::{ api::{ + appservice, client::{ error::ErrorKind, r0::alias::{create_alias, delete_alias, get_alias}, @@ -75,13 +76,37 @@ pub async fn get_alias_helper( return Ok(get_alias::Response::new(response.room_id, response.servers).into()); } - let room_id = db - .rooms - .id_from_alias(&room_alias)? - .ok_or(Error::BadRequest( - ErrorKind::NotFound, - "Room with alias not found.", - ))?; + let mut room_id = None; + match db.rooms.id_from_alias(&room_alias)? { + Some(r) => room_id = Some(r), + None => { + for (_id, registration) in db.appservice.iter_all().filter_map(|r| r.ok()) { + if appservice_server::send_request( + &db.globals, + registration, + appservice::query::query_room_alias::v1::Request { room_alias }, + ) + .await + .is_ok() + { + room_id = Some(db.rooms.id_from_alias(&room_alias)?.ok_or_else(|| { + Error::bad_config("Appservice lied to us. Room does not exist.") + })?); + break; + } + } + } + }; + + let room_id = match room_id { + Some(room_id) => room_id, + None => { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Room with alias not found.", + )) + } + }; Ok(get_alias::Response::new(room_id, vec![db.globals.server_name().to_owned()]).into()) } diff --git a/src/client_server/media.rs b/src/client_server/media.rs index e6bd182..0776c9e 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -45,7 +45,7 @@ pub async fn create_content_route( db.flush().await?; - Ok(create_content::Response { content_uri: mxc }.into()) + Ok(create_content::Response { content_uri: mxc, blurhash: None }.into()) } #[cfg_attr( diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 6d3a690..46548d5 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -128,6 +128,7 @@ pub async fn leave_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.flush().await?; @@ -167,6 +168,7 @@ pub async fn invite_user_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.flush().await?; @@ -222,6 +224,7 @@ pub async fn kick_user_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.flush().await?; @@ -281,6 +284,7 @@ pub async fn ban_user_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.flush().await?; @@ -332,6 +336,7 @@ pub async fn unban_user_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.flush().await?; @@ -713,6 +718,7 @@ async fn join_room_by_id_helper( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 327b9ab..3640730 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -22,7 +22,7 @@ pub async fn send_message_event_route( body: Ruma>, ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let sender_device = body.sender_device.as_ref().expect("user is authenticated"); + let sender_device = body.sender_device.as_deref(); // Check if this is a new transaction id if let Some(response) = @@ -69,6 +69,7 @@ pub async fn send_message_event_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.transaction_ids.add_txnid( diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 828d259..761443d 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -67,6 +67,7 @@ pub async fn set_displayname_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // Presence update @@ -163,6 +164,7 @@ pub async fn set_avatar_url_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // Presence update diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index 6f7728a..212e751 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -35,6 +35,7 @@ pub async fn redact_event_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.flush().await?; diff --git a/src/client_server/room.rs b/src/client_server/room.rs index f92fc8d..e473e6e 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -69,6 +69,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 2. Let the room creator join @@ -93,6 +94,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 3. Power levels @@ -137,6 +139,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 4. Events set by preset @@ -176,6 +179,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 4.2 History Visibility @@ -196,6 +200,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 4.3 Guest Access @@ -224,6 +229,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // 5. Events listed in initial_state @@ -246,6 +252,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } @@ -270,6 +277,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } @@ -291,6 +299,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } @@ -317,6 +326,7 @@ pub async fn create_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } @@ -407,6 +417,7 @@ pub async fn upgrade_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // Get the old room federations status @@ -450,6 +461,7 @@ pub async fn upgrade_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // Join the new room @@ -474,6 +486,7 @@ pub async fn upgrade_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; // Recommended transferable state events list from the specs @@ -510,6 +523,7 @@ pub async fn upgrade_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; } @@ -556,6 +570,7 @@ pub async fn upgrade_room_route( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; db.flush().await?; diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 010b20d..cecb79d 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -63,8 +63,8 @@ pub async fn send_state_event_for_empty_key_route( let Ruma { body, sender_user, - sender_device: _, json_body, + .. } = body; let json = serde_json::from_str::( @@ -288,6 +288,7 @@ pub async fn send_state_event_for_key_helper( &db.sending, &db.admin, &db.account_data, + &db.appservice, )?; Ok(event_id) diff --git a/src/client_server/to_device.rs b/src/client_server/to_device.rs index 8cc3e29..5bc001e 100644 --- a/src/client_server/to_device.rs +++ b/src/client_server/to_device.rs @@ -17,7 +17,7 @@ pub async fn send_event_to_device_route( body: Ruma>, ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let sender_device = body.sender_device.as_ref().expect("user is authenticated"); + let sender_device = body.sender_device.as_deref(); // Check if this is a new transaction id if db diff --git a/src/database.rs b/src/database.rs index 4905070..5150517 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,5 +1,6 @@ pub mod account_data; pub mod admin; +pub mod appservice; pub mod globals; pub mod key_backups; pub mod media; @@ -16,6 +17,8 @@ use log::info; use rocket::futures::{self, channel::mpsc}; use ruma::{DeviceId, ServerName, UserId}; use serde::Deserialize; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; use std::{convert::TryInto, fs::remove_dir_all}; #[derive(Clone, Deserialize)] @@ -59,6 +62,7 @@ pub struct Database { pub transaction_ids: transaction_ids::TransactionIds, pub sending: sending::Sending, pub admin: admin::Admin, + pub appservice: appservice::Appservice, pub _db: sled::Db, } @@ -180,6 +184,10 @@ impl Database { admin: admin::Admin { sender: admin_sender, }, + appservice: appservice::Appservice { + cached_registrations: Arc::new(RwLock::new(HashMap::new())), + id_appserviceregistrations: db.open_tree("id_appserviceregistrations")?, + }, _db: db, }; diff --git a/src/database/admin.rs b/src/database/admin.rs index 778796f..7de6bf9 100644 --- a/src/database/admin.rs +++ b/src/database/admin.rs @@ -10,7 +10,9 @@ use ruma::{ use tokio::select; pub enum AdminCommand { - SendTextMessage(message::TextMessageEventContent), + RegisterAppservice(serde_yaml::Value), + ListAppservices, + SendMessage(message::MessageEventContent), } #[derive(Clone)] @@ -44,28 +46,49 @@ impl Admin { warn!("Conduit instance does not have an #admins room. Logging to that room will not work."); } + let send_message = |message: message::MessageEventContent| { + if let Some(conduit_room) = &conduit_room { + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMessage, + content: serde_json::to_value(message) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &db.globals, + &db.sending, + &db.admin, + &db.account_data, + &db.appservice, + ) + .unwrap(); + } + }; + loop { select! { Some(event) = receiver.next() => { match event { - AdminCommand::SendTextMessage(message) => { - if let Some(conduit_room) = &conduit_room { - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMessage, - content: serde_json::to_value(message).expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &db.globals, - &db.sending, - &db.admin, - &db.account_data, - ).unwrap(); - } + AdminCommand::RegisterAppservice(yaml) => { + db.appservice.register_appservice(yaml).unwrap(); // TODO handle error + } + AdminCommand::ListAppservices => { + let appservices = db.appservice.iter_ids().collect::>(); + let count = appservices.len(); + let output = format!( + "Appservices ({}): {}", + count, + appservices.into_iter().filter_map(|r| r.ok()).collect::>().join(", ") + ); + send_message(message::MessageEventContent::text_plain(output)); + } + AdminCommand::SendMessage(message) => { + send_message(message); } } } diff --git a/src/database/appservice.rs b/src/database/appservice.rs new file mode 100644 index 0000000..26ea5b9 --- /dev/null +++ b/src/database/appservice.rs @@ -0,0 +1,67 @@ +use crate::{utils, Error, Result}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +#[derive(Clone)] +pub struct Appservice { + pub(super) cached_registrations: Arc>>, + pub(super) id_appserviceregistrations: sled::Tree, +} + +impl Appservice { + pub fn register_appservice(&self, yaml: serde_yaml::Value) -> Result<()> { + // TODO: Rumaify + let id = yaml.get("id").unwrap().as_str().unwrap(); + self.id_appserviceregistrations + .insert(id, serde_yaml::to_string(&yaml).unwrap().as_bytes())?; + self.cached_registrations + .write() + .unwrap() + .insert(id.to_owned(), yaml); + + Ok(()) + } + + pub fn get_registration(&self, id: &str) -> Result> { + self.cached_registrations + .read() + .unwrap() + .get(id) + .map_or_else( + || { + Ok(self + .id_appserviceregistrations + .get(id)? + .map(|bytes| { + Ok::<_, Error>(serde_yaml::from_slice(&bytes).map_err(|_| { + Error::bad_database( + "Invalid registration bytes in id_appserviceregistrations.", + ) + })?) + }) + .transpose()?) + }, + |r| Ok(Some(r.clone())), + ) + } + + pub fn iter_ids(&self) -> impl Iterator> { + self.id_appserviceregistrations.iter().keys().map(|id| { + Ok(utils::string_from_bytes(&id?).map_err(|_| { + Error::bad_database("Invalid id bytes in id_appserviceregistrations.") + })?) + }) + } + + pub fn iter_all<'a>( + &'a self, + ) -> impl Iterator> + 'a { + self.iter_ids().filter_map(|id| id.ok()).map(move |id| { + Ok(( + id.clone(), + self.get_registration(&id)? + .expect("iter_ids only returns appservices that exist"), + )) + }) + } +} diff --git a/src/database/globals.rs b/src/database/globals.rs index 1221609..e913c0f 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -1,10 +1,10 @@ use crate::{database::Config, utils, Error, Result}; -use trust_dns_resolver::TokioAsyncResolver; -use std::collections::HashMap; use log::error; use ruma::ServerName; +use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; +use trust_dns_resolver::TokioAsyncResolver; pub const COUNTER: &str = "c"; @@ -59,9 +59,11 @@ impl Globals { config, keypair: Arc::new(keypair), reqwest_client: reqwest::Client::new(), - dns_resolver: TokioAsyncResolver::tokio_from_system_conf().await.map_err(|_| { - Error::bad_config("Failed to set up trust dns resolver with system config.") - })?, + dns_resolver: TokioAsyncResolver::tokio_from_system_conf() + .await + .map_err(|_| { + Error::bad_config("Failed to set up trust dns resolver with system config.") + })?, actual_destination_cache: Arc::new(RwLock::new(HashMap::new())), }) } diff --git a/src/database/media.rs b/src/database/media.rs index 89d48e1..448d071 100644 --- a/src/database/media.rs +++ b/src/database/media.rs @@ -290,7 +290,12 @@ impl Media { file: thumbnail_bytes.to_vec(), })) } else { - Ok(None) + // Couldn't parse file to generate thumbnail, send original + Ok(Some(FileMeta { + filename, + content_type, + file: file.to_vec(), + })) } } else { Ok(None) diff --git a/src/database/rooms.rs b/src/database/rooms.rs index fb139a6..3e2a17f 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -36,16 +36,6 @@ use super::admin::AdminCommand; /// hashing the entire state. pub type StateHashId = IVec; -/// An enum that represents the two valid states when searching -/// for an events "parent". -/// -/// An events parent is any event we are aware of that is part of -/// the events `prev_events` array. -pub(crate) enum ClosestParent { - Append, - Insert(u64), -} - #[derive(Clone)] pub struct Rooms { pub edus: edus::RoomEdus, @@ -411,54 +401,6 @@ impl Rooms { } } - /// Recursively search for a PDU from our DB that is also in the - /// `prev_events` field of the incoming PDU. - /// - /// First we check if the last PDU inserted to the given room is a parent - /// if not we recursively check older `prev_events` to insert the incoming - /// event after. - pub(crate) fn get_latest_pduid_before( - &self, - room: &RoomId, - incoming_prev_ids: &[EventId], - their_state: &BTreeMap>, - ) -> Result> { - match self.pduid_pdu.scan_prefix(room.as_bytes()).last() { - Some(Ok(val)) - if incoming_prev_ids.contains( - &serde_json::from_slice::(&val.1) - .map_err(|_| { - Error::bad_database("last DB entry contains invalid PDU bytes") - })? - .event_id, - ) => - { - Ok(Some(ClosestParent::Append)) - } - _ => { - let mut prev_ids = incoming_prev_ids.to_vec(); - while let Some(id) = prev_ids.pop() { - match self.get_pdu_id(&id)? { - Some(pdu_id) => { - return Ok(Some(ClosestParent::Insert(self.pdu_count(&pdu_id)?))); - } - None => { - prev_ids.extend(their_state.get(&id).map_or( - Err(Error::BadServerResponse( - "Failed to find previous event for PDU in state", - )), - // `prev_event_ids` will return an empty Vec instead of failing - // so it works perfect for our use here - |pdu| Ok(pdu.prev_event_ids()), - )?); - } - } - } - Ok(None) - } - } - } - /// Returns the leaf pdus of a room. pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result> { let mut prefix = room_id.as_bytes().to_vec(); @@ -583,18 +525,59 @@ impl Rooms { .as_ref() == Some(&pdu.room_id) { - let mut parts = body.split_whitespace().skip(1); + let mut lines = body.lines(); + let command_line = lines.next().expect("each string has at least one line"); + let body = lines.collect::>(); + + let mut parts = command_line.split_whitespace().skip(1); if let Some(command) = parts.next() { let args = parts.collect::>(); - admin.send(AdminCommand::SendTextMessage( - message::TextMessageEventContent { - body: format!("Command: {}, Args: {:?}", command, args), - formatted: None, - relates_to: None, - new_content: None, - }, - )); + match command { + "register_appservice" => { + if body.len() > 2 + && body[0].trim() == "```" + && body.last().unwrap().trim() == "```" + { + let appservice_config = body[1..body.len() - 1].join("\n"); + let parsed_config = serde_yaml::from_str::( + &appservice_config, + ); + match parsed_config { + Ok(yaml) => { + admin.send(AdminCommand::RegisterAppservice(yaml)); + } + Err(e) => { + admin.send(AdminCommand::SendMessage( + message::MessageEventContent::text_plain( + format!( + "Could not parse appservice config: {}", + e + ), + ), + )); + } + } + } else { + admin.send(AdminCommand::SendMessage( + message::MessageEventContent::text_plain( + "Expected code block in command body.", + ), + )); + } + } + "list_appservices" => { + admin.send(AdminCommand::ListAppservices); + } + _ => { + admin.send(AdminCommand::SendMessage( + message::MessageEventContent::text_plain(format!( + "Command: {}, Args: {:?}", + command, args + )), + )); + } + } } } } @@ -675,6 +658,7 @@ impl Rooms { sending: &super::sending::Sending, admin: &super::admin::Admin, account_data: &super::account_data::AccountData, + appservice: &super::appservice::Appservice, ) -> Result { let PduBuilder { event_type, @@ -923,6 +907,10 @@ impl Rooms { sending.send_pdu(&server, &pdu_id)?; } + for appservice in appservice.iter_all().filter_map(|r| r.ok()) { + sending.send_pdu_appservice(&appservice.0, &pdu_id)?; + } + Ok(pdu.event_id) } diff --git a/src/database/sending.rs b/src/database/sending.rs index cd88e08..7ce7d63 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -1,26 +1,35 @@ use std::{collections::HashMap, convert::TryFrom, time::SystemTime}; -use crate::{server_server, utils, Error, PduEvent, Result}; +use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; use federation::transactions::send_transaction_message; -use log::{debug, warn}; +use log::warn; use rocket::futures::stream::{FuturesUnordered, StreamExt}; -use ruma::{api::federation, ServerName}; +use ruma::{ + api::{appservice, federation}, + ServerName, +}; use sled::IVec; use tokio::select; #[derive(Clone)] pub struct Sending { /// The state for a given state hash. - pub(super) servernamepduids: sled::Tree, // ServernamePduId = ServerName + PduId - pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = ServerName + PduId (pduid can be empty for reservation) + pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+)ServerName + PduId + pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+)ServerName + PduId (pduid can be empty for reservation) } impl Sending { - pub fn start_handler(&self, globals: &super::globals::Globals, rooms: &super::rooms::Rooms) { + pub fn start_handler( + &self, + globals: &super::globals::Globals, + rooms: &super::rooms::Rooms, + appservice: &super::appservice::Appservice, + ) { let servernamepduids = self.servernamepduids.clone(); let servercurrentpdus = self.servercurrentpdus.clone(); let rooms = rooms.clone(); let globals = globals.clone(); + let appservice = appservice.clone(); tokio::spawn(async move { let mut futures = FuturesUnordered::new(); @@ -28,7 +37,7 @@ impl Sending { // Retry requests we could not finish yet let mut current_transactions = HashMap::new(); - for (server, pdu) in servercurrentpdus + for (server, pdu, is_appservice) in servercurrentpdus .iter() .filter_map(|r| r.ok()) .map(|(key, _)| { @@ -38,45 +47,61 @@ impl Sending { Error::bad_database("Invalid bytes in servercurrentpdus.") })?; + let server = utils::string_from_bytes(&server).map_err(|_| { + Error::bad_database("Invalid server bytes in server_currenttransaction") + })?; + + // Appservices start with a plus + let (server, is_appservice) = if server.starts_with("+") { + (&server[1..], true) + } else { + (&*server, false) + }; + Ok::<_, Error>(( - Box::::try_from(utils::string_from_bytes(&server).map_err( - |_| { - Error::bad_database( - "Invalid server bytes in server_currenttransaction", - ) - }, - )?) - .map_err(|_| { + Box::::try_from(server).map_err(|_| { Error::bad_database( "Invalid server string in server_currenttransaction", ) })?, IVec::from(pdu), + is_appservice, )) }) .filter_map(|r| r.ok()) - .filter(|(_, pdu)| !pdu.is_empty()) // Skip reservation key + .filter(|(_, pdu, _)| !pdu.is_empty()) // Skip reservation key .take(50) // This should not contain more than 50 anyway { current_transactions - .entry(server) + .entry((server, is_appservice)) .or_insert_with(Vec::new) .push(pdu); } - for (server, pdus) in current_transactions { - futures.push(Self::handle_event(server, pdus, &globals, &rooms)); + for ((server, is_appservice), pdus) in current_transactions { + futures.push(Self::handle_event( + server, + is_appservice, + pdus, + &globals, + &rooms, + &appservice, + )); } let mut subscriber = servernamepduids.watch_prefix(b""); loop { select! { - Some(server) = futures.next() => { - debug!("sending response: {:?}", &server); - match server { - Ok((server, _response)) => { - let mut prefix = server.as_bytes().to_vec(); + Some(response) = futures.next() => { + match response { + Ok((server, is_appservice)) => { + let mut prefix = if is_appservice { + "+".as_bytes().to_vec() + } else { + Vec::new() + }; + prefix.extend_from_slice(server.as_bytes()); prefix.push(0xff); for key in servercurrentpdus @@ -109,13 +134,13 @@ impl Sending { servernamepduids.remove(¤t_key).unwrap(); } - futures.push(Self::handle_event(server, new_pdus, &globals, &rooms)); + futures.push(Self::handle_event(server, is_appservice, new_pdus, &globals, &rooms, &appservice)); } else { servercurrentpdus.remove(&prefix).unwrap(); // servercurrentpdus with the prefix should be empty now } } - Err((server, e)) => { + Err((server, _is_appservice, e)) => { warn!("Couldn't send transaction to {}: {}", server, e) // TODO: exponential backoff } @@ -126,24 +151,37 @@ impl Sending { let servernamepduid = key.clone(); let mut parts = servernamepduid.splitn(2, |&b| b == 0xff); - if let Some((server, pdu_id)) = utils::string_from_bytes( + if let Some((server, is_appservice, pdu_id)) = utils::string_from_bytes( parts .next() .expect("splitn will always return 1 or more elements"), ) .map_err(|_| Error::bad_database("ServerName in servernamepduid bytes are invalid.")) - .and_then(|server_str| Box::::try_from(server_str) - .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))) + .map(|server_str| { + // Appservices start with a plus + if server_str.starts_with("+") { + (server_str[1..].to_owned(), true) + } else { + (server_str, false) + } + }) + .and_then(|(server_str, is_appservice)| Box::::try_from(server_str) + .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid.")).map(|s| (s, is_appservice))) .ok() - .and_then(|server| parts + .and_then(|(server, is_appservice)| parts .next() .ok_or_else(|| Error::bad_database("Invalid servernamepduid in db.")) .ok() - .map(|pdu_id| (server, pdu_id)) + .map(|pdu_id| (server, is_appservice, pdu_id)) ) // TODO: exponential backoff - .filter(|(server, _)| { - let mut prefix = server.to_string().as_bytes().to_vec(); + .filter(|(server, is_appservice, _)| { + let mut prefix = if *is_appservice { + "+".as_bytes().to_vec() + } else { + Vec::new() + }; + prefix.extend_from_slice(server.as_bytes()); prefix.push(0xff); servercurrentpdus @@ -154,7 +192,7 @@ impl Sending { servercurrentpdus.insert(&key, &[]).unwrap(); servernamepduids.remove(&key).unwrap(); - futures.push(Self::handle_event(server, vec![pdu_id.into()], &globals, &rooms)); + futures.push(Self::handle_event(server, is_appservice, vec![pdu_id.into()], &globals, &rooms, &appservice)); } } } @@ -172,56 +210,102 @@ impl Sending { Ok(()) } + pub fn send_pdu_appservice(&self, appservice_id: &str, pdu_id: &[u8]) -> Result<()> { + let mut key = "+".as_bytes().to_vec(); + key.extend_from_slice(appservice_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(pdu_id); + self.servernamepduids.insert(key, b"")?; + + Ok(()) + } + async fn handle_event( server: Box, + is_appservice: bool, pdu_ids: Vec, globals: &super::globals::Globals, rooms: &super::rooms::Rooms, - ) -> std::result::Result< - (Box, send_transaction_message::v1::Response), - (Box, Error), - > { - let pdu_jsons = pdu_ids - .iter() - .map(|pdu_id| { - Ok::<_, (Box, Error)>( - // TODO: check room version and remove event_id if needed - serde_json::from_str( - PduEvent::convert_to_outgoing_federation_event( - rooms - .get_pdu_json_from_id(pdu_id) - .map_err(|e| (server.clone(), e))? - .ok_or_else(|| { - ( - server.clone(), - Error::bad_database( - "Event in servernamepduids not found in db.", - ), - ) - })?, - ) - .json() - .get(), + appservice: &super::appservice::Appservice, + ) -> std::result::Result<(Box, bool), (Box, bool, Error)> { + if is_appservice { + let pdu_jsons = pdu_ids + .iter() + .map(|pdu_id| { + Ok::<_, (Box, Error)>( + rooms + .get_pdu_from_id(pdu_id) + .map_err(|e| (server.clone(), e))? + .ok_or_else(|| { + ( + server.clone(), + Error::bad_database( + "Event in servernamepduids not found in db.", + ), + ) + })? + .to_any_event(), ) - .expect("Raw<..> is always valid"), - ) - }) - .filter_map(|r| r.ok()) - .collect::>(); + }) + .filter_map(|r| r.ok()) + .collect::>(); + appservice_server::send_request( + &globals, + appservice + .get_registration(server.as_str()) + .unwrap() + .unwrap(), // TODO: handle error + appservice::event::push_events::v1::Request { + events: &pdu_jsons, + txn_id: &utils::random_string(16), + }, + ) + .await + .map(|_response| (server.clone(), is_appservice)) + .map_err(|e| (server, is_appservice, e)) + } else { + let pdu_jsons = pdu_ids + .iter() + .map(|pdu_id| { + Ok::<_, (Box, Error)>( + // TODO: check room version and remove event_id if needed + serde_json::from_str( + PduEvent::convert_to_outgoing_federation_event( + rooms + .get_pdu_json_from_id(pdu_id) + .map_err(|e| (server.clone(), e))? + .ok_or_else(|| { + ( + server.clone(), + Error::bad_database( + "Event in servernamepduids not found in db.", + ), + ) + })?, + ) + .json() + .get(), + ) + .expect("Raw<..> is always valid"), + ) + }) + .filter_map(|r| r.ok()) + .collect::>(); - server_server::send_request( - &globals, - server.clone(), - send_transaction_message::v1::Request { - origin: globals.server_name(), - pdus: &pdu_jsons, - edus: &[], - origin_server_ts: SystemTime::now(), - transaction_id: &utils::random_string(16), - }, - ) - .await - .map(|response| (server.clone(), response)) - .map_err(|e| (server, e)) + server_server::send_request( + &globals, + server.clone(), + send_transaction_message::v1::Request { + origin: globals.server_name(), + pdus: &pdu_jsons, + edus: &[], + origin_server_ts: SystemTime::now(), + transaction_id: &utils::random_string(16), + }, + ) + .await + .map(|_response| (server.clone(), is_appservice)) + .map_err(|e| (server, is_appservice, e)) + } } } diff --git a/src/database/transaction_ids.rs b/src/database/transaction_ids.rs index 7c0eb98..1f8ba7d 100644 --- a/src/database/transaction_ids.rs +++ b/src/database/transaction_ids.rs @@ -11,13 +11,13 @@ impl TransactionIds { pub fn add_txnid( &self, user_id: &UserId, - device_id: &DeviceId, + device_id: Option<&DeviceId>, txn_id: &str, data: &[u8], ) -> Result<()> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); - key.extend_from_slice(device_id.as_bytes()); + key.extend_from_slice(device_id.map(|d| d.as_bytes()).unwrap_or_default()); key.push(0xff); key.extend_from_slice(txn_id.as_bytes()); @@ -29,12 +29,12 @@ impl TransactionIds { pub fn existing_txnid( &self, user_id: &UserId, - device_id: &DeviceId, + device_id: Option<&DeviceId>, txn_id: &str, ) -> Result> { let mut key = user_id.as_bytes().to_vec(); key.push(0xff); - key.extend_from_slice(device_id.as_bytes()); + key.extend_from_slice(device_id.map(|d| d.as_bytes()).unwrap_or_default()); key.push(0xff); key.extend_from_slice(txn_id.as_bytes()); diff --git a/src/error.rs b/src/error.rs index 316ca74..7d4a751 100644 --- a/src/error.rs +++ b/src/error.rs @@ -142,8 +142,8 @@ impl log::Log for ConduitLogger { mut_last_logs.insert(output.clone(), Instant::now()); } - self.db.admin.send(AdminCommand::SendTextMessage( - message::TextMessageEventContent::plain(output), + self.db.admin.send(AdminCommand::SendMessage( + message::MessageEventContent::text_plain(output), )); } } diff --git a/src/lib.rs b/src/lib.rs index eea32c7..aed129f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod appservice_server; pub mod client_server; mod database; mod error; diff --git a/src/main.rs b/src/main.rs index 58d3427..9574894 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![warn(rust_2018_idioms)] +pub mod appservice_server; pub mod client_server; pub mod server_server; @@ -139,7 +140,8 @@ fn setup_rocket() -> rocket::Rocket { .await .expect("config is valid"); - data.sending.start_handler(&data.globals, &data.rooms); + data.sending + .start_handler(&data.globals, &data.rooms, &data.appservice); log::set_boxed_logger(Box::new(ConduitLogger { db: data.clone(), last_logs: Default::default(), diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index 9597ac8..0fdca74 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -12,7 +12,7 @@ use std::{ #[cfg(feature = "conduit_bin")] use { crate::utils, - log::warn, + log::{debug, warn}, rocket::{ data::{ ByteUnit, Data, FromDataFuture, FromTransformedData, Transform, TransformFuture, @@ -34,6 +34,7 @@ pub struct Ruma { pub sender_user: Option, pub sender_device: Option>, pub json_body: Option>, // This is None when body is not a valid string + pub from_appservice: bool, } #[cfg(feature = "conduit_bin")] @@ -66,28 +67,72 @@ where .await .expect("database was loaded"); - let (sender_user, sender_device) = match T::METADATA.authentication { - AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => { - // Get token from header or query value - let token = match request - .headers() - .get_one("Authorization") - .map(|s| s[7..].to_owned()) // Split off "Bearer " - .or_else(|| request.get_query_value("access_token").and_then(|r| r.ok())) - { - // TODO: M_MISSING_TOKEN - None => return Failure((Status::Unauthorized, ())), - Some(token) => token, - }; + // Get token from header or query value + let token = request + .headers() + .get_one("Authorization") + .map(|s| s[7..].to_owned()) // Split off "Bearer " + .or_else(|| request.get_query_value("access_token").and_then(|r| r.ok())); - // Check if token is valid - match db.users.find_from_token(&token).unwrap() { - // TODO: M_UNKNOWN_TOKEN - None => return Failure((Status::Unauthorized, ())), - Some((user_id, device_id)) => (Some(user_id), Some(device_id.into())), + let (sender_user, sender_device, from_appservice) = if let Some((_id, registration)) = + db.appservice + .iter_all() + .filter_map(|r| r.ok()) + .find(|(_id, registration)| { + registration + .get("as_token") + .and_then(|as_token| as_token.as_str()) + .map_or(false, |as_token| token.as_deref() == Some(as_token)) + }) { + match T::METADATA.authentication { + AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => { + let user_id = request.get_query_value::("user_id").map_or_else( + || { + UserId::parse_with_server_name( + registration + .get("sender_localpart") + .unwrap() + .as_str() + .unwrap(), + db.globals.server_name(), + ) + .unwrap() + }, + |string| { + UserId::try_from(string.expect("parsing to string always works")) + .unwrap() + }, + ); + + if !db.users.exists(&user_id).unwrap() { + return Failure((Status::Unauthorized, ())); + } + + // TODO: Check if appservice is allowed to be that user + (Some(user_id), None, true) } + AuthScheme::ServerSignatures => (None, None, true), + AuthScheme::None => (None, None, true), + } + } else { + match T::METADATA.authentication { + AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => { + if let Some(token) = token { + match db.users.find_from_token(&token).unwrap() { + // TODO: M_UNKNOWN_TOKEN + None => return Failure((Status::Unauthorized, ())), + Some((user_id, device_id)) => { + (Some(user_id), Some(device_id.into()), false) + } + } + } else { + // TODO: M_MISSING_TOKEN + return Failure((Status::Unauthorized, ())); + } + } + AuthScheme::ServerSignatures => (None, None, false), + AuthScheme::None => (None, None, false), } - _ => (None, None), }; let mut http_request = http::Request::builder() @@ -103,7 +148,7 @@ where handle.read_to_end(&mut body).await.unwrap(); let http_request = http_request.body(body.clone()).unwrap(); - log::debug!("{:?}", http_request); + debug!("{:?}", http_request); match ::Incoming::try_from(http_request) { Ok(t) => Success(Ruma { @@ -114,6 +159,7 @@ where json_body: utils::string_from_bytes(&body) .ok() .and_then(|s| serde_json::value::RawValue::from_string(s).ok()), + from_appservice, }), Err(e) => { warn!("{:?}", e); diff --git a/src/server_server.rs b/src/server_server.rs index 58dd872..7d12c54 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,10 +1,7 @@ -use crate::{ - client_server, database::rooms::ClosestParent, utils, ConduitResult, Database, Error, PduEvent, - Result, Ruma, -}; +use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; -use log::{error, warn}; +use log::warn; use rocket::{get, post, put, response::content::Json, State}; use ruma::{ api::{ @@ -27,7 +24,6 @@ use std::{ collections::BTreeMap, convert::TryFrom, fmt::Debug, - sync::Arc, time::{Duration, SystemTime}, }; @@ -73,7 +69,6 @@ where .cloned(); let (actual_destination, host) = if let Some(result) = maybe_result { - println!("Loaded {} -> {:?}", destination, result); result } else { let result = find_actual_destination(globals, &destination).await; @@ -82,7 +77,6 @@ where .write() .unwrap() .insert(destination.clone(), result.clone()); - println!("Saving {} -> {:?}", destination, result); result }; @@ -491,173 +485,28 @@ pub async fn send_transaction_message_route<'a>( continue; } - // If it is not a state event, we can skip state-res... maybe - if value.get("state_key").is_none() { - if !db.rooms.is_joined(&pdu.sender, room_id)? { - warn!("Sender is not joined {}", pdu.kind); - resolved_map.insert(event_id, Err("User is not in this room".into())); - continue; - } + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_to_state(&pdu_id, &pdu)?; - db.rooms.append_to_state(&pdu_id, &pdu)?; + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.admin, + )?; - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - - resolved_map.insert(event_id, Ok::<(), String>(())); - continue; + for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { + db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; } - // We have a state event so we need info for state-res - let get_state_response = match send_request( - &db.globals, - body.body.origin.clone(), - ruma::api::federation::event::get_room_state::v1::Request { - room_id, - event_id: &event_id, - }, - ) - .await - { - Ok(res) => res, - // We can't hard fail because there are some valid errors, just - // keep checking PDU's - // - // As an example a possible error - // {"errcode":"M_FORBIDDEN","error":"Host not in room."} - Err(err) => { - resolved_map.insert(event_id, Err(err.to_string())); - continue; - } - }; - - let their_current_state = get_state_response - .pdus - .iter() - .chain(get_state_response.auth_chain.iter()) // add auth events - .map(|pdu| { - let (event_id, json) = crate::pdu::process_incoming_pdu(pdu); - ( - event_id.clone(), - Arc::new( - // When creating a StateEvent the event_id arg will be used - // over any found in the json and it will not use ruma::reference_hash - // to generate one - state_res::StateEvent::from_id_canon_obj(event_id, json) - .expect("valid pdu json"), - ), - ) - }) - .collect::>(); - - let our_current_state = db.rooms.room_state_full(room_id)?; - // State resolution takes care of these checks - // 4. Passes authorization rules based on the event's auth events, otherwise it is rejected. - // 5. Passes authorization rules based on the state at the event, otherwise it is rejected. - - // TODO: 6. Passes authorization rules based on the current state of the room, otherwise it is "soft failed". - match state_res::StateResolution::resolve( - room_id, - &ruma::RoomVersionId::Version6, - &[ - our_current_state - .iter() - .map(|((ev, sk), v)| ((ev.clone(), sk.to_owned()), v.event_id.clone())) - .collect::>(), - their_current_state - .iter() - .map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id())) - .collect::>(), - ], - Some( - our_current_state - .iter() - .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) - .chain( - their_current_state - .iter() - .map(|(id, ev)| (id.clone(), ev.clone())), - ) - .collect::>(), - ), - &db.rooms, - ) { - Ok(resolved) if resolved.values().any(|id| &event_id == id) => { - // If the event is older than the last event in pduid_pdu Tree then find the - // closest ancestor we know of and insert after the known ancestor by - // altering the known events pduid to = same roomID + same count bytes + 0x1 - // pushing a single byte every time a simple append cannot be done. - match db.rooms.get_latest_pduid_before( - room_id, - &pdu.prev_events, - &their_current_state, - )? { - Some(ClosestParent::Append) => { - let count = db.globals.next_count()?; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - } - Some(ClosestParent::Insert(old_count)) => { - let count = old_count; - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - // Create a new count that is after old_count but before - // the pdu appended after - pdu_id.push(1); - - db.rooms.append_pdu( - &pdu, - &value, - count, - pdu_id.into(), - &db.globals, - &db.account_data, - &db.admin, - )?; - } - _ => { - error!("Not a sequential event or no parents found"); - continue; - } - } - - resolved_map.insert(event_id, Ok::<(), String>(())); - } - // If the eventId is not found in the resolved state auth has failed - Ok(_) => { - resolved_map.insert( - event_id, - Err("This event failed authentication, not found in resolved set".into()), - ); - } - Err(e) => { - resolved_map.insert(event_id, Err(e.to_string())); - } - }; + resolved_map.insert(event_id, Ok::<(), String>(())); } Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())