appservice: Drop actix
parent
df71de5af2
commit
f454d407af
|
@ -94,19 +94,10 @@ jobs:
|
|||
strategy:
|
||||
matrix:
|
||||
name:
|
||||
- linux / appservice / stable / actix
|
||||
- macOS / appservice / stable / actix
|
||||
- linux / appservice / stable / warp
|
||||
- macOS / appservice / stable / warp
|
||||
|
||||
include:
|
||||
- name: linux / appservice / stable / actix
|
||||
cargo_args: --no-default-features --features actix
|
||||
|
||||
- name: macOS / appservice / stable / actix
|
||||
os: macOS-latest
|
||||
cargo_args: --no-default-features --features actix
|
||||
|
||||
- name: linux / appservice / stable / warp
|
||||
cargo_args: --features warp
|
||||
|
||||
|
|
|
@ -9,13 +9,10 @@ version = "0.1.0"
|
|||
|
||||
[features]
|
||||
default = ["warp"]
|
||||
actix = ["actix-rt", "actix-web"]
|
||||
|
||||
docs = ["actix", "warp"]
|
||||
docs = ["warp"]
|
||||
|
||||
[dependencies]
|
||||
actix-rt = { version = "2", optional = true }
|
||||
actix-web = { version = "4.0.0-beta.6", optional = true }
|
||||
dashmap = "4"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
|
|
|
@ -70,19 +70,8 @@ pub enum Error {
|
|||
#[cfg(feature = "warp")]
|
||||
#[error("warp rejection: {0}")]
|
||||
WarpRejection(String),
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
#[error(transparent)]
|
||||
Actix(#[from] actix_web::Error),
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
#[error(transparent)]
|
||||
ActixPayload(#[from] actix_web::error::PayloadError),
|
||||
}
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
impl actix_web::error::ResponseError for Error {}
|
||||
|
||||
#[cfg(feature = "warp")]
|
||||
impl warp::reject::Reject for Error {}
|
||||
|
||||
|
|
|
@ -74,8 +74,8 @@
|
|||
//! [matrix-org/matrix-rust-sdk#228]: https://github.com/matrix-org/matrix-rust-sdk/issues/228
|
||||
//! [examples directory]: https://github.com/matrix-org/matrix-rust-sdk/tree/master/matrix_sdk_appservice/examples
|
||||
|
||||
#[cfg(not(any(feature = "actix", feature = "warp")))]
|
||||
compile_error!("one webserver feature must be enabled. available ones: `actix`, `warp`");
|
||||
#[cfg(not(any(feature = "warp")))]
|
||||
compile_error!("one webserver feature must be enabled. available ones: `warp`");
|
||||
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
|
@ -441,24 +441,6 @@ impl AppService {
|
|||
Ok(false)
|
||||
}
|
||||
|
||||
/// Returns a closure to be used with [`actix_web::App::configure()`]
|
||||
///
|
||||
/// Note that if you handle any of the [application-service-specific
|
||||
/// routes], including the legacy routes, you will break the appservice
|
||||
/// functionality.
|
||||
///
|
||||
/// [application-service-specific routes]: https://spec.matrix.org/unstable/application-service-api/#legacy-routes
|
||||
#[cfg(feature = "actix")]
|
||||
#[cfg_attr(docs, doc(cfg(feature = "actix")))]
|
||||
pub fn actix_configure(&self) -> impl FnOnce(&mut actix_web::web::ServiceConfig) {
|
||||
let appservice = self.clone();
|
||||
|
||||
move |config| {
|
||||
config.data(appservice);
|
||||
webserver::actix::configure(config);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [`warp::Filter`] to be used as [`warp::serve()`] route
|
||||
///
|
||||
/// Note that if you handle any of the [application-service-specific
|
||||
|
@ -482,19 +464,13 @@ impl AppService {
|
|||
let port = port.into();
|
||||
info!("Starting AppService on {}:{}", &host, &port);
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
{
|
||||
webserver::actix::run_server(self.clone(), host, port).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "warp")]
|
||||
{
|
||||
webserver::warp::run_server(self.clone(), host, port).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "actix", feature = "warp",)))]
|
||||
#[cfg(not(any(feature = "warp",)))]
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,149 +0,0 @@
|
|||
// Copyright 2021 Famedly GmbH
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
pub use actix_web::Scope;
|
||||
use actix_web::{
|
||||
dev::Payload,
|
||||
error::PayloadError,
|
||||
get, put,
|
||||
web::{self, BytesMut, Data},
|
||||
App, FromRequest, HttpRequest, HttpResponse, HttpServer,
|
||||
};
|
||||
use futures::Future;
|
||||
use futures_util::TryStreamExt;
|
||||
use ruma::api::appservice as api;
|
||||
|
||||
use crate::{error::Error, AppService};
|
||||
|
||||
pub async fn run_server(
|
||||
appservice: AppService,
|
||||
host: impl Into<String>,
|
||||
port: impl Into<u16>,
|
||||
) -> Result<(), Error> {
|
||||
HttpServer::new(move || App::new().configure(appservice.actix_configure()))
|
||||
.bind((host.into(), port.into()))?
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure(config: &mut actix_web::web::ServiceConfig) {
|
||||
// also handles legacy routes
|
||||
config.service(push_transactions).service(query_user_id).service(query_room_alias).service(
|
||||
web::scope("/_matrix/app/v1")
|
||||
.service(push_transactions)
|
||||
.service(query_user_id)
|
||||
.service(query_room_alias),
|
||||
);
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
#[put("/transactions/{txn_id}")]
|
||||
async fn push_transactions(
|
||||
request: IncomingRequest<api::event::push_events::v1::IncomingRequest>,
|
||||
appservice: Data<AppService>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
if !appservice.compare_hs_token(request.access_token) {
|
||||
return Ok(HttpResponse::Unauthorized().finish());
|
||||
}
|
||||
|
||||
appservice.get_cached_client(None)?.receive_transaction(request.incoming).await?;
|
||||
|
||||
Ok(HttpResponse::Ok().json("{}"))
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
#[get("/users/{user_id}")]
|
||||
async fn query_user_id(
|
||||
request: IncomingRequest<api::query::query_user_id::v1::IncomingRequest>,
|
||||
appservice: Data<AppService>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
if !appservice.compare_hs_token(request.access_token) {
|
||||
return Ok(HttpResponse::Unauthorized().finish());
|
||||
}
|
||||
|
||||
Ok(HttpResponse::Ok().json("{}"))
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
#[get("/rooms/{room_alias}")]
|
||||
async fn query_room_alias(
|
||||
request: IncomingRequest<api::query::query_room_alias::v1::IncomingRequest>,
|
||||
appservice: Data<AppService>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
if !appservice.compare_hs_token(request.access_token) {
|
||||
return Ok(HttpResponse::Unauthorized().finish());
|
||||
}
|
||||
|
||||
Ok(HttpResponse::Ok().json("{}"))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IncomingRequest<T> {
|
||||
access_token: String,
|
||||
incoming: T,
|
||||
}
|
||||
|
||||
impl<T: ruma::api::IncomingRequest> FromRequest for IncomingRequest<T> {
|
||||
type Error = Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
|
||||
type Config = ();
|
||||
|
||||
fn from_request(request: &HttpRequest, payload: &mut Payload) -> Self::Future {
|
||||
let request = request.to_owned();
|
||||
let payload = payload.take();
|
||||
|
||||
Box::pin(async move {
|
||||
let mut builder =
|
||||
http::request::Builder::new().method(request.method()).uri(request.uri());
|
||||
|
||||
let headers = builder.headers_mut().ok_or(Error::UnknownHttpRequestBuilder)?;
|
||||
for (key, value) in request.headers().iter() {
|
||||
headers.append(key, value.to_owned());
|
||||
}
|
||||
|
||||
let bytes = payload
|
||||
.try_fold(BytesMut::new(), |mut body, chunk| async move {
|
||||
body.extend_from_slice(&chunk);
|
||||
Ok::<_, PayloadError>(body)
|
||||
})
|
||||
.await?
|
||||
.into();
|
||||
|
||||
let access_token = match request.uri().query() {
|
||||
Some(query) => {
|
||||
let query: Vec<(String, String)> = ruma::serde::urlencoded::from_str(query)?;
|
||||
query.into_iter().find(|(key, _)| key == "access_token").map(|(_, value)| value)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let access_token = match access_token {
|
||||
Some(access_token) => access_token,
|
||||
None => return Err(Error::MissingAccessToken),
|
||||
};
|
||||
|
||||
let request = builder.body(bytes)?;
|
||||
let request = crate::transform_legacy_route(request)?;
|
||||
|
||||
Ok(IncomingRequest {
|
||||
access_token,
|
||||
incoming: ruma::api::IncomingRequest::try_from_http_request(request)?,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,4 +1,2 @@
|
|||
#[cfg(feature = "actix")]
|
||||
pub mod actix;
|
||||
#[cfg(feature = "warp")]
|
||||
pub mod warp;
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
use actix_web::{test as actix_test, App as ActixApp, HttpResponse};
|
||||
use matrix_sdk::{
|
||||
async_trait,
|
||||
room::Room,
|
||||
|
@ -24,7 +22,7 @@ fn registration_string() -> String {
|
|||
async fn appservice(registration: Option<Registration>) -> Result<AppService> {
|
||||
// env::set_var(
|
||||
// "RUST_LOG",
|
||||
// "mockito=debug,matrix_sdk=debug,ruma=debug,actix_web=debug,warp=debug",
|
||||
// "mockito=debug,matrix_sdk=debug,ruma=debug,warp=debug",
|
||||
// );
|
||||
let _ = tracing_subscriber::fmt::try_init();
|
||||
|
||||
|
@ -99,16 +97,6 @@ async fn test_put_transaction() -> Result<()> {
|
|||
.into_response()
|
||||
.status();
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
let status = {
|
||||
let app =
|
||||
actix_test::init_service(ActixApp::new().configure(appservice.actix_configure())).await;
|
||||
|
||||
let req = actix_test::TestRequest::put().uri(uri).set_json(&transaction).to_request();
|
||||
|
||||
actix_test::call_service(&app, req).await.status()
|
||||
};
|
||||
|
||||
assert_eq!(status, 200);
|
||||
|
||||
Ok(())
|
||||
|
@ -130,16 +118,6 @@ async fn test_get_user() -> Result<()> {
|
|||
.into_response()
|
||||
.status();
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
let status = {
|
||||
let app =
|
||||
actix_test::init_service(ActixApp::new().configure(appservice.actix_configure())).await;
|
||||
|
||||
let req = actix_test::TestRequest::get().uri(uri).to_request();
|
||||
|
||||
actix_test::call_service(&app, req).await.status()
|
||||
};
|
||||
|
||||
assert_eq!(status, 200);
|
||||
|
||||
Ok(())
|
||||
|
@ -161,16 +139,6 @@ async fn test_get_room() -> Result<()> {
|
|||
.into_response()
|
||||
.status();
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
let status = {
|
||||
let app =
|
||||
actix_test::init_service(ActixApp::new().configure(appservice.actix_configure())).await;
|
||||
|
||||
let req = actix_test::TestRequest::get().uri(uri).to_request();
|
||||
|
||||
actix_test::call_service(&app, req).await.status()
|
||||
};
|
||||
|
||||
assert_eq!(status, 200);
|
||||
|
||||
Ok(())
|
||||
|
@ -197,16 +165,6 @@ async fn test_invalid_access_token() -> Result<()> {
|
|||
.into_response()
|
||||
.status();
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
let status = {
|
||||
let app =
|
||||
actix_test::init_service(ActixApp::new().configure(appservice.actix_configure())).await;
|
||||
|
||||
let req = actix_test::TestRequest::put().uri(uri).set_json(&transaction).to_request();
|
||||
|
||||
actix_test::call_service(&app, req).await.status()
|
||||
};
|
||||
|
||||
assert_eq!(status, 401);
|
||||
|
||||
Ok(())
|
||||
|
@ -237,20 +195,6 @@ async fn test_no_access_token() -> Result<()> {
|
|||
assert_eq!(status, 401);
|
||||
}
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
{
|
||||
let app =
|
||||
actix_test::init_service(ActixApp::new().configure(appservice.actix_configure())).await;
|
||||
|
||||
let req = actix_test::TestRequest::put().uri(uri).set_json(&transaction).to_request();
|
||||
|
||||
let resp = actix_test::call_service(&app, req).await;
|
||||
|
||||
// TODO: this should actually return a 401 but is 500 because something in the
|
||||
// extractor fails
|
||||
assert_eq!(resp.status(), 500);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -296,16 +240,6 @@ async fn test_event_handler() -> Result<()> {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
{
|
||||
let app =
|
||||
actix_test::init_service(ActixApp::new().configure(appservice.actix_configure())).await;
|
||||
|
||||
let req = actix_test::TestRequest::put().uri(uri).set_json(&transaction).to_request();
|
||||
|
||||
actix_test::call_service(&app, req).await;
|
||||
};
|
||||
|
||||
let on_room_member_called = *example.on_state_member.lock().unwrap();
|
||||
assert!(on_room_member_called);
|
||||
|
||||
|
@ -332,20 +266,6 @@ async fn test_unrelated_path() -> Result<()> {
|
|||
response.status()
|
||||
};
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
let status = {
|
||||
let app = actix_test::init_service(
|
||||
ActixApp::new()
|
||||
.configure(appservice.actix_configure())
|
||||
.route("/unrelated", actix_web::web::get().to(HttpResponse::Ok)),
|
||||
)
|
||||
.await;
|
||||
|
||||
let req = actix_test::TestRequest::get().uri("/unrelated").to_request();
|
||||
|
||||
actix_test::call_service(&app, req).await.status()
|
||||
};
|
||||
|
||||
assert_eq!(status, 200);
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in New Issue