From 38512d6a548826a1c1f4e7802ede439eee184f34 Mon Sep 17 00:00:00 2001 From: Johannes Becker Date: Thu, 3 Jun 2021 18:20:07 +0200 Subject: [PATCH] appservice: Add warp support --- .github/workflows/ci.yml | 37 +++- matrix_sdk/src/client.rs | 14 +- matrix_sdk_appservice/Cargo.toml | 9 +- .../examples/actix_autojoin.rs | 2 +- matrix_sdk_appservice/src/actix.rs | 35 +--- matrix_sdk_appservice/src/error.rs | 9 + matrix_sdk_appservice/src/lib.rs | 72 +++++-- matrix_sdk_appservice/src/warp.rs | 198 ++++++++++++++++++ matrix_sdk_appservice/tests/tests.rs | 182 ++++++++++------ 9 files changed, 435 insertions(+), 123 deletions(-) create mode 100644 matrix_sdk_appservice/src/warp.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5c7f11d0..3dc9f258 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,14 +94,37 @@ jobs: strategy: matrix: name: - - linux / appservice / stable - - macOS / appservice / stable + - linux / appservice / stable / actix + - macOS / appservice / stable / actix + - windows / appservice / stable-x86_64-msvc / actix + - linux / appservice / stable / warp + - macOS / appservice / stable / warp + - windows / appservice / stable-x86_64-msvc / warp include: - - name: linux / appservice / stable + - name: linux / appservice / stable / actix + cargo_args: --features actix - - name: macOS / appservice / stable + - name: macOS / appservice / stable / actix os: macOS-latest + cargo_args: --features actix + + - name: windows / appservice / stable-x86_64-msvc / actix + os: windows-latest + target: x86_64-pc-windows-msvc + cargo_args: --features actix + + - name: linux / appservice / stable / warp + cargo_args: --features warp + + - name: macOS / appservice / stable / warp + os: macOS-latest + cargo_args: --features warp + + - name: windows / appservice / stable-x86_64-msvc / warp + os: windows-latest + target: x86_64-pc-windows-msvc + cargo_args: --features warp steps: - name: Checkout @@ -119,19 +142,19 @@ jobs: uses: actions-rs/cargo@v1 with: command: clippy - args: --manifest-path matrix_sdk_appservice/Cargo.toml -- -D warnings + args: --manifest-path matrix_sdk_appservice/Cargo.toml ${{ matrix.cargo_args }} -- -D warnings - name: Build uses: actions-rs/cargo@v1 with: command: build - args: --manifest-path matrix_sdk_appservice/Cargo.toml + args: --manifest-path matrix_sdk_appservice/Cargo.toml ${{ matrix.cargo_args }} - name: Test uses: actions-rs/cargo@v1 with: command: test - args: --manifest-path matrix_sdk_appservice/Cargo.toml + args: --manifest-path matrix_sdk_appservice/Cargo.toml ${{ matrix.cargo_args }} test-features: name: ${{ matrix.name }} diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index a9102e6c..46264212 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -480,9 +480,9 @@ impl RequestConfig { /// Force sending authorization even if the endpoint does not require it. /// Default is only sending authorization if it is required - #[cfg(feature = "require_auth_for_profile_requests")] - #[cfg_attr(feature = "docs", doc(cfg(require_auth_for_profile_requests)))] - pub(crate) fn force_auth(mut self) -> Self { + #[cfg(any(feature = "require_auth_for_profile_requests", feature = "appservice"))] + #[cfg_attr(feature = "docs", doc(cfg(any(require_auth_for_profile_requests, appservice))))] + pub fn force_auth(mut self) -> Self { self.force_auth = true; self } @@ -1363,8 +1363,14 @@ impl Client { ) -> Result { info!("Registering to {}", self.homeserver().await); + #[cfg(not(feature = "appservice"))] + let config = None; + + #[cfg(feature = "appservice")] + let config = Some(RequestConfig::new().force_auth()); + let request = registration.into(); - self.send(request, None).await + self.send(request, config).await } /// Get or upload a sync filter. diff --git a/matrix_sdk_appservice/Cargo.toml b/matrix_sdk_appservice/Cargo.toml index 59e34327..351d200c 100644 --- a/matrix_sdk_appservice/Cargo.toml +++ b/matrix_sdk_appservice/Cargo.toml @@ -8,10 +8,10 @@ name = "matrix-sdk-appservice" version = "0.1.0" [features] -default = ["actix"] +default = [] actix = ["actix-rt", "actix-web"] -docs = [] +docs = ["actix", "warp"] [dependencies] actix-rt = { version = "2", optional = true } @@ -21,11 +21,13 @@ futures = "0.3" futures-util = "0.3" http = "0.2" regex = "1" -serde = "1.0.126" +serde = "1" +serde_json = "1" serde_yaml = "0.8" thiserror = "1.0" tracing = "0.1" url = "2" +warp = { version = "0.3", optional = true } matrix-sdk = { version = "0.2", path = "../matrix_sdk", default-features = false, features = ["appservice", "native-tls"] } @@ -36,7 +38,6 @@ features = ["client-api-c", "appservice-api-s", "unstable-pre-spec"] [dev-dependencies] env_logger = "0.8" mockito = "0.30" -serde_json = "1" tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros"] } tracing-subscriber = "0.2" diff --git a/matrix_sdk_appservice/examples/actix_autojoin.rs b/matrix_sdk_appservice/examples/actix_autojoin.rs index 8f848901..8c203cb0 100644 --- a/matrix_sdk_appservice/examples/actix_autojoin.rs +++ b/matrix_sdk_appservice/examples/actix_autojoin.rs @@ -35,7 +35,7 @@ impl EventHandler for AppserviceEventHandler { let user_id = UserId::try_from(event.state_key.clone()).unwrap(); let appservice = self.appservice.clone(); - appservice.register(user_id.localpart()).await.unwrap(); + appservice.register_virtual_user(user_id.localpart()).await.unwrap(); let client = appservice.virtual_user(user_id.localpart()).await.unwrap(); diff --git a/matrix_sdk_appservice/src/actix.rs b/matrix_sdk_appservice/src/actix.rs index 41530e17..9228dcf0 100644 --- a/matrix_sdk_appservice/src/actix.rs +++ b/matrix_sdk_appservice/src/actix.rs @@ -12,23 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - convert::{TryFrom, TryInto}, - pin::Pin, -}; +use std::pin::Pin; pub use actix_web::Scope; use actix_web::{ dev::Payload, error::PayloadError, - get, - http::PathAndQuery, - put, + get, put, web::{self, BytesMut, Data}, App, FromRequest, HttpRequest, HttpResponse, HttpServer, }; use futures::Future; -use futures_util::{TryFutureExt, TryStreamExt}; +use futures_util::TryStreamExt; use ruma::api::appservice as api; use crate::{error::Error, Appservice}; @@ -112,23 +107,8 @@ impl FromRequest for IncomingRequest { let payload = payload.take(); Box::pin(async move { - let uri = request.uri().to_owned(); - - let uri = if !uri.path().starts_with("/_matrix/app/v1") { - // rename legacy routes - let mut parts = uri.into_parts(); - let path_and_query = match parts.path_and_query { - Some(path_and_query) => format!("/_matrix/app/v1{}", path_and_query), - None => "/_matrix/app/v1".to_owned(), - }; - parts.path_and_query = - Some(PathAndQuery::try_from(path_and_query).map_err(http::Error::from)?); - parts.try_into().map_err(http::Error::from)? - } else { - uri - }; - - let mut builder = http::request::Builder::new().method(request.method()).uri(uri); + let mut builder = + http::request::Builder::new().method(request.method()).uri(request.uri()); let headers = builder.headers_mut().ok_or(Error::UnknownHttpRequestBuilder)?; for (key, value) in request.headers().iter() { @@ -140,8 +120,8 @@ impl FromRequest for IncomingRequest { body.extend_from_slice(&chunk); Ok::<_, PayloadError>(body) }) - .and_then(|bytes| async move { Ok::, _>(bytes.into_iter().collect()) }) - .await?; + .await? + .into(); let access_token = match request.uri().query() { Some(query) => { @@ -157,6 +137,7 @@ impl FromRequest for IncomingRequest { }; let request = builder.body(bytes)?; + let request = crate::transform_legacy_route(request)?; Ok(IncomingRequest { access_token, diff --git a/matrix_sdk_appservice/src/error.rs b/matrix_sdk_appservice/src/error.rs index 3b59bef0..73a6bc7b 100644 --- a/matrix_sdk_appservice/src/error.rs +++ b/matrix_sdk_appservice/src/error.rs @@ -31,6 +31,9 @@ pub enum Error { #[error("no client for localpart found")] NoClientForLocalpart, + #[error("could not convert host:port to socket addr")] + HostPortToSocketAddrs, + #[error(transparent)] HttpRequest(#[from] ruma::api::error::FromHttpRequestError), @@ -61,6 +64,9 @@ pub enum Error { #[error(transparent)] SerdeYaml(#[from] serde_yaml::Error), + #[error(transparent)] + SerdeJson(#[from] serde_json::Error), + #[cfg(feature = "actix")] #[error(transparent)] Actix(#[from] actix_web::Error), @@ -72,3 +78,6 @@ pub enum Error { #[cfg(feature = "actix")] impl actix_web::error::ResponseError for Error {} + +#[cfg(feature = "warp")] +impl warp::reject::Reject for Error {} diff --git a/matrix_sdk_appservice/src/lib.rs b/matrix_sdk_appservice/src/lib.rs index 5e3792ac..db53a67d 100644 --- a/matrix_sdk_appservice/src/lib.rs +++ b/matrix_sdk_appservice/src/lib.rs @@ -36,10 +36,10 @@ //! # //! # use matrix_sdk::{async_trait, EventHandler}; //! # -//! # struct AppserviceEventHandler; +//! # struct MyEventHandler; //! # //! # #[async_trait] -//! # impl EventHandler for AppserviceEventHandler {} +//! # impl EventHandler for MyEventHandler {} //! # //! use matrix_sdk_appservice::{Appservice, AppserviceRegistration}; //! @@ -59,7 +59,7 @@ //! ")?; //! //! let mut appservice = Appservice::new(homeserver_url, server_name, registration).await?; -//! appservice.set_event_handler(Box::new(AppserviceEventHandler)).await?; +//! appservice.set_event_handler(Box::new(MyEventHandler)).await?; //! //! let (host, port) = appservice.registration().get_host_and_port()?; //! appservice.run(host, port).await?; @@ -74,8 +74,8 @@ //! [matrix-org/matrix-rust-sdk#228]: https://github.com/matrix-org/matrix-rust-sdk/issues/228 //! [examples directory]: https://github.com/matrix-org/matrix-rust-sdk/tree/master/matrix_sdk_appservice/examples -#[cfg(not(any(feature = "actix",)))] -compile_error!("one webserver feature must be enabled. available ones: `actix`"); +#[cfg(not(any(feature = "actix", feature = "warp")))] +compile_error!("one webserver feature must be enabled. available ones: `actix`, `warp`"); use std::{ convert::{TryFrom, TryInto}, @@ -86,8 +86,12 @@ use std::{ }; use dashmap::DashMap; -use http::Uri; -use matrix_sdk::{reqwest::Url, Client, ClientConfig, EventHandler, HttpError, Session}; +pub use error::Error; +use http::{uri::PathAndQuery, Uri}; +pub use matrix_sdk as sdk; +use matrix_sdk::{ + reqwest::Url, Bytes, Client, ClientConfig, EventHandler, HttpError, RequestConfig, Session, +}; use regex::Regex; #[doc(inline)] pub use ruma::api::appservice as api; @@ -110,8 +114,9 @@ use tracing::warn; #[cfg(feature = "actix")] mod actix; mod error; +#[cfg(feature = "warp")] +mod warp; -pub use error::Error; pub type Result = std::result::Result; pub type Host = String; pub type Port = u16; @@ -356,14 +361,18 @@ impl Appservice { /// /// * `localpart` - The localpart of the user to register. Must be covered /// by the namespaces in the [`Registration`] in order to succeed. - pub async fn register(&mut self, localpart: impl AsRef) -> Result<()> { + pub async fn register_virtual_user(&self, localpart: impl AsRef) -> Result<()> { let request = assign!(RegistrationRequest::new(), { username: Some(localpart.as_ref()), login_type: Some(&LoginType::ApplicationService), }); let client = self.get_cached_client(None)?; - match client.register(request).await { + + // TODO: use `client.register()` instead + // blocked by: https://github.com/seanmonstar/warp/pull/861 + let config = Some(RequestConfig::new().force_auth()); + match client.send(request, config).await { Ok(_) => (), Err(error) => match error { matrix_sdk::Error::Http(HttpError::UiaaError(FromHttpResponseError::Http( @@ -412,13 +421,20 @@ impl Appservice { Ok(false) } - /// Service to register on an Actix `App` + /// [`actix_web::Scope`] to be used with [`actix_web::App::service()`] #[cfg(feature = "actix")] #[cfg_attr(docs, doc(cfg(feature = "actix")))] pub fn actix_service(&self) -> actix::Scope { actix::get_scope().data(self.clone()) } + /// [`::warp::Filter`] to be used as warp serve route + #[cfg(feature = "warp")] + #[cfg_attr(docs, doc(cfg(feature = "warp")))] + pub fn warp_filter(&self) -> ::warp::filters::BoxedFilter<(impl ::warp::Reply,)> { + crate::warp::warp_filter(self.clone()) + } + /// Convenience method that runs an http server depending on the selected /// server feature /// @@ -431,7 +447,39 @@ impl Appservice { Ok(()) } - #[cfg(not(any(feature = "actix",)))] + #[cfg(feature = "warp")] + { + warp::run_server(self.clone(), host, port).await?; + Ok(()) + } + + #[cfg(not(any(feature = "actix", feature = "warp",)))] unreachable!() } } + +/// Transforms [legacy routes] to the correct route so ruma can parse them +/// properly +/// +/// [legacy routes]: https://matrix.org/docs/spec/application_service/r0.1.2#legacy-routes +pub(crate) fn transform_legacy_route( + mut request: http::Request, +) -> Result> { + let uri = request.uri().to_owned(); + + if !uri.path().starts_with("/_matrix/app/v1") { + // rename legacy routes + let mut parts = uri.into_parts(); + let path_and_query = match parts.path_and_query { + Some(path_and_query) => format!("/_matrix/app/v1{}", path_and_query), + None => "/_matrix/app/v1".to_owned(), + }; + parts.path_and_query = + Some(PathAndQuery::try_from(path_and_query).map_err(http::Error::from)?); + let uri = parts.try_into().map_err(http::Error::from)?; + + *request.uri_mut() = uri; + } + + Ok(request) +} diff --git a/matrix_sdk_appservice/src/warp.rs b/matrix_sdk_appservice/src/warp.rs new file mode 100644 index 00000000..ff9cc7fc --- /dev/null +++ b/matrix_sdk_appservice/src/warp.rs @@ -0,0 +1,198 @@ +// Copyright 2021 Famedly GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{net::ToSocketAddrs, result::Result as StdResult}; + +use futures::TryFutureExt; +use matrix_sdk::Bytes; +use serde::Serialize; +use warp::{filters::BoxedFilter, path::FullPath, Filter, Rejection, Reply}; + +use crate::{Appservice, Error, Result}; + +pub async fn run_server( + appservice: Appservice, + host: impl Into, + port: impl Into, +) -> Result<()> { + let routes = warp_filter(appservice); + + let mut addr = format!("{}:{}", host.into(), port.into()).to_socket_addrs()?; + if let Some(addr) = addr.next() { + warp::serve(routes).run(addr).await; + Ok(()) + } else { + Err(Error::HostPortToSocketAddrs) + } +} + +pub fn warp_filter(appservice: Appservice) -> BoxedFilter<(impl Reply,)> { + warp::any() + .and(filters::valid_access_token(appservice.registration().hs_token.clone())) + .and(filters::transactions(appservice)) + .or(filters::users()) + .or(filters::rooms()) + .recover(handle_rejection) + .boxed() +} + +mod filters { + use super::*; + + pub fn users() -> BoxedFilter<(impl Reply,)> { + warp::get() + .and( + warp::path!("_matrix" / "app" / "v1" / "users" / String) + // legacy route + .or(warp::path!("users" / String)) + .unify(), + ) + .and_then(handlers::user) + .boxed() + } + + pub fn rooms() -> BoxedFilter<(impl Reply,)> { + warp::get() + .and( + warp::path!("_matrix" / "app" / "v1" / "rooms" / String) + // legacy route + .or(warp::path!("rooms" / String)) + .unify(), + ) + .and_then(handlers::room) + .boxed() + } + + pub fn transactions(appservice: Appservice) -> BoxedFilter<(impl Reply,)> { + warp::put() + .and( + warp::path!("_matrix" / "app" / "v1" / "transactions" / String) + // legacy route + .or(warp::path!("transactions" / String)) + .unify(), + ) + .and(with_appservice(appservice)) + .and(http_request().and_then(|request| async move { + let request = crate::transform_legacy_route(request).map_err(Error::from)?; + Ok::, Rejection>(request) + })) + .and_then(handlers::transaction) + .boxed() + } + + pub fn with_appservice( + appservice: Appservice, + ) -> impl Filter + Clone { + warp::any().map(move || appservice.clone()) + } + + pub fn valid_access_token(token: String) -> BoxedFilter<()> { + warp::any() + .map(move || token.clone()) + .and(warp::query::raw()) + .and_then(|token: String, query: String| async move { + let query: Vec<(String, String)> = + matrix_sdk::urlencoded::from_str(&query).map_err(Error::from)?; + + if query.into_iter().any(|(key, value)| key == "access_token" && value == token) { + Ok::<(), Rejection>(()) + } else { + Err(warp::reject::custom(Unauthorized)) + } + }) + .untuple_one() + .boxed() + } + + pub fn http_request() -> impl Filter,), Error = Rejection> + Copy + { + // TODO: extract `hyper::Request` instead + // blocked by https://github.com/seanmonstar/warp/issues/139 + warp::any() + .and(warp::method()) + .and(warp::filters::path::full()) + .and(warp::filters::query::raw()) + .and(warp::header::headers_cloned()) + .and(warp::body::bytes()) + .and_then(|method, path: FullPath, query, headers, bytes| async move { + let uri = http::uri::Builder::new() + .path_and_query(format!("{}?{}", path.as_str(), query)) + .build() + .map_err(Error::from)?; + + let mut request = http::Request::builder() + .method(method) + .uri(uri) + .body(bytes) + .map_err(Error::from)?; + + *request.headers_mut() = headers; + + Ok::, Rejection>(request) + }) + } +} + +mod handlers { + use super::*; + + pub async fn user(_: String) -> StdResult { + Ok(warp::reply::json(&String::from("{}"))) + } + + pub async fn room(_: String) -> StdResult { + Ok(warp::reply::json(&String::from("{}"))) + } + + pub async fn transaction( + _: String, + appservice: Appservice, + request: http::Request, + ) -> StdResult { + let incoming_transaction: matrix_sdk::api_appservice::event::push_events::v1::IncomingRequest = + matrix_sdk::IncomingRequest::try_from_http_request(request).map_err(Error::from)?; + + let client = appservice.get_cached_client(None)?; + client.receive_transaction(incoming_transaction).map_err(Error::from).await?; + + Ok(warp::reply::json(&String::from("{}"))) + } +} + +#[derive(Debug)] +struct Unauthorized; + +impl warp::reject::Reject for Unauthorized {} + +#[derive(Serialize)] +struct ErrorMessage { + code: u16, + message: String, +} + +pub async fn handle_rejection( + err: Rejection, +) -> std::result::Result { + let mut code = http::StatusCode::INTERNAL_SERVER_ERROR; + let mut message = "INTERNAL_SERVER_ERROR"; + + if err.find::().is_some() || err.find::().is_some() { + code = http::StatusCode::UNAUTHORIZED; + message = "UNAUTHORIZED"; + } + + let json = warp::reply::json(&ErrorMessage { code: code.as_u16(), message: message.into() }); + + Ok(warp::reply::with_status(json, code)) +} diff --git a/matrix_sdk_appservice/tests/tests.rs b/matrix_sdk_appservice/tests/tests.rs index e8277f17..4b7b6838 100644 --- a/matrix_sdk_appservice/tests/tests.rs +++ b/matrix_sdk_appservice/tests/tests.rs @@ -13,6 +13,8 @@ use matrix_sdk::{ use matrix_sdk_appservice::*; use matrix_sdk_test::async_test; use serde_json::json; +#[cfg(feature = "warp")] +use warp::Reply; fn registration_string() -> String { include_str!("../tests/registration.yaml").to_owned() @@ -63,12 +65,9 @@ fn member_json() -> serde_json::Value { } #[async_test] -async fn test_transactions() -> Result<()> { +async fn test_put_transaction() -> Result<()> { let appservice = appservice(None).await?; - #[cfg(feature = "actix")] - let app = actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; - let transactions = r#"{ "events": [ { @@ -78,40 +77,93 @@ async fn test_transactions() -> Result<()> { ] }"#; - let transactions: serde_json::Value = serde_json::from_str(transactions).unwrap(); + let uri = "/_matrix/app/v1/transactions/1?access_token=hs_token"; + let transactions: serde_json::Value = serde_json::from_str(transactions)?; + + #[cfg(feature = "warp")] + let status = warp::test::request() + .method("PUT") + .path(uri) + .json(&transactions) + .filter(&appservice.warp_filter()) + .await + .unwrap() + .into_response() + .status(); #[cfg(feature = "actix")] - { - let req = actix_test::TestRequest::put() - .uri("/_matrix/app/v1/transactions/1?access_token=hs_token") - .set_json(&transactions) - .to_request(); + let status = { + let app = + actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; - let resp = actix_test::call_service(&app, req).await; + let req = actix_test::TestRequest::put().uri(uri).set_json(&transactions).to_request(); - assert_eq!(resp.status(), 200); - } + actix_test::call_service(&app, req).await.status() + }; + + assert_eq!(status, 200); Ok(()) } #[async_test] -async fn test_users() -> Result<()> { +async fn test_get_user() -> Result<()> { let appservice = appservice(None).await?; + let uri = "/_matrix/app/v1/users/%40_botty_1%3Adev.famedly.local?access_token=hs_token"; + + #[cfg(feature = "warp")] + let status = warp::test::request() + .method("GET") + .path(uri) + .filter(&appservice.warp_filter()) + .await + .unwrap() + .into_response() + .status(); + #[cfg(feature = "actix")] - { + let status = { let app = actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; - let req = actix_test::TestRequest::get() - .uri("/_matrix/app/v1/users/%40_botty_1%3Adev.famedly.local?access_token=hs_token") - .to_request(); + let req = actix_test::TestRequest::get().uri(uri).to_request(); - let resp = actix_test::call_service(&app, req).await; + actix_test::call_service(&app, req).await.status() + }; - assert_eq!(resp.status(), 200); - } + assert_eq!(status, 200); + + Ok(()) +} + +#[async_test] +async fn test_get_room() -> Result<()> { + let appservice = appservice(None).await?; + + let uri = "/_matrix/app/v1/rooms/%23magicforest%3Aexample.com?access_token=hs_token"; + + #[cfg(feature = "warp")] + let status = warp::test::request() + .method("GET") + .path(uri) + .filter(&appservice.warp_filter()) + .await + .unwrap() + .into_response() + .status(); + + #[cfg(feature = "actix")] + let status = { + let app = + actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; + + let req = actix_test::TestRequest::get().uri(uri).to_request(); + + actix_test::call_service(&app, req).await.status() + }; + + assert_eq!(status, 200); Ok(()) } @@ -120,9 +172,6 @@ async fn test_users() -> Result<()> { async fn test_invalid_access_token() -> Result<()> { let appservice = appservice(None).await?; - #[cfg(feature = "actix")] - let app = actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; - let transactions = r#"{ "events": [ { @@ -133,18 +182,30 @@ async fn test_invalid_access_token() -> Result<()> { }"#; let transactions: serde_json::Value = serde_json::from_str(transactions).unwrap(); + let uri = "/_matrix/app/v1/transactions/1?access_token=invalid_token"; + + #[cfg(feature = "warp")] + let status = warp::test::request() + .method("PUT") + .path(uri) + .json(&transactions) + .filter(&appservice.warp_filter()) + .await + .unwrap() + .into_response() + .status(); #[cfg(feature = "actix")] - { - let req = actix_test::TestRequest::put() - .uri("/_matrix/app/v1/transactions/1?access_token=invalid_token") - .set_json(&transactions) - .to_request(); + let status = { + let app = + actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; - let resp = actix_test::call_service(&app, req).await; + let req = actix_test::TestRequest::put().uri(uri).set_json(&transactions).to_request(); - assert_eq!(resp.status(), 401); - } + actix_test::call_service(&app, req).await.status() + }; + + assert_eq!(status, 401); Ok(()) } @@ -153,9 +214,6 @@ async fn test_invalid_access_token() -> Result<()> { async fn test_no_access_token() -> Result<()> { let appservice = appservice(None).await?; - #[cfg(feature = "actix")] - let app = actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; - let transactions = r#"{ "events": [ { @@ -167,12 +225,29 @@ async fn test_no_access_token() -> Result<()> { let transactions: serde_json::Value = serde_json::from_str(transactions).unwrap(); + let uri = "/_matrix/app/v1/transactions/1"; + + #[cfg(feature = "warp")] + { + let status = warp::test::request() + .method("PUT") + .path(uri) + .json(&transactions) + .filter(&appservice.warp_filter()) + .await + .unwrap() + .into_response() + .status(); + + assert_eq!(status, 401); + } + #[cfg(feature = "actix")] { - let req = actix_test::TestRequest::put() - .uri("/_matrix/app/v1/transactions/1") - .set_json(&transactions) - .to_request(); + let app = + actix_test::init_service(ActixApp::new().service(appservice.actix_service())).await; + + let req = actix_test::TestRequest::put().uri(uri).set_json(&transactions).to_request(); let resp = actix_test::call_service(&app, req).await; @@ -219,35 +294,6 @@ async fn test_event_handler() -> Result<()> { Ok(()) } -#[async_test] -async fn test_transaction() -> Result<()> { - let appservice = appservice(None).await?; - - let event = serde_json::from_value::(member_json()).unwrap(); - let event: Raw = AnyRoomEvent::State(event).into(); - let events = vec![event]; - - let incoming = api_appservice::event::push_events::v1::IncomingRequest::new( - "any_txn_id".to_owned(), - events, - ); - - appservice.get_cached_client(None)?.receive_transaction(incoming).await?; - - Ok(()) -} - -#[async_test] -async fn test_verify_hs_token() -> Result<()> { - let appservice = appservice(None).await?; - - let registration = appservice.registration(); - - assert!(appservice.compare_hs_token(®istration.hs_token)); - - Ok(()) -} - mod registration { use super::*;