Merge branch 'feat/appservice'

master
Damir Jelić 2021-05-11 09:50:26 +02:00
commit 4c09c6272b
17 changed files with 1188 additions and 30 deletions

View File

@ -52,17 +52,17 @@ jobs:
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: clippy command: clippy
args: --all-targets -- -D warnings args: --workspace --exclude matrix-sdk-appservice --all-targets -- -D warnings
- name: Clippy without default features - name: Clippy without default features
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: clippy command: clippy
# TODO: add `--all-targets` once all warnings in examples are resolved # TODO: add `--all-targets` once all warnings in examples are resolved
args: --no-default-features --features native-tls -- -D warnings args: --workspace --exclude matrix-sdk-appservice --no-default-features --features native-tls -- -D warnings
check-wasm: check-wasm:
name: Check if WASM support compiles name: linux / WASM
needs: [clippy] needs: [clippy]
runs-on: ubuntu-latest runs-on: ubuntu-latest
@ -86,6 +86,58 @@ jobs:
cd matrix_sdk/examples/wasm_command_bot cd matrix_sdk/examples/wasm_command_bot
cargo check --target wasm32-unknown-unknown cargo check --target wasm32-unknown-unknown
test-appservice:
name: ${{ matrix.name }}
needs: [clippy]
runs-on: ${{ matrix.os || 'ubuntu-latest' }}
strategy:
matrix:
name:
- linux / appservice / stable
- macOS / appservice / stable
- windows / appservice / stable-x86_64-msvc
include:
- name: linux / appservice / stable
- name: macOS / appservice / stable
os: macOS-latest
- name: windows / appservice / stable-x86_64-msvc
os: windows-latest
target: x86_64-pc-windows-msvc
steps:
- name: Checkout
uses: actions/checkout@v1
- name: Install rust
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust || 'stable' }}
target: ${{ matrix.target }}
profile: minimal
override: true
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --manifest-path matrix_sdk_appservice/Cargo.toml -- -D warnings
- name: Build
uses: actions-rs/cargo@v1
with:
command: build
args: --manifest-path matrix_sdk_appservice/Cargo.toml
- name: Test
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path matrix_sdk_appservice/Cargo.toml
test-features: test-features:
name: ${{ matrix.name }} name: ${{ matrix.name }}
needs: [clippy] needs: [clippy]
@ -198,8 +250,10 @@ jobs:
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: build command: build
args: --workspace --exclude matrix-sdk-appservice
- name: Test - name: Test
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: test command: test
args: --workspace --exclude matrix-sdk-appservice

View File

@ -6,4 +6,5 @@ members = [
"matrix_sdk_test_macros", "matrix_sdk_test_macros",
"matrix_sdk_crypto", "matrix_sdk_crypto",
"matrix_sdk_common", "matrix_sdk_common",
"matrix_sdk_appservice"
] ]

View File

@ -26,6 +26,7 @@ rustls-tls = ["reqwest/rustls-tls"]
socks = ["reqwest/socks"] socks = ["reqwest/socks"]
sso_login = ["warp", "rand", "tokio-stream"] sso_login = ["warp", "rand", "tokio-stream"]
require_auth_for_profile_requests = [] require_auth_for_profile_requests = []
appservice = ["matrix-sdk-common/appservice", "serde_yaml"]
docs = ["encryption", "sled_cryptostore", "sled_state_store", "sso_login"] docs = ["encryption", "sled_cryptostore", "sled_state_store", "sso_login"]
@ -40,6 +41,7 @@ url = "2.2.0"
zeroize = "1.2.0" zeroize = "1.2.0"
mime = "0.3.16" mime = "0.3.16"
rand = { version = "0.8.2", optional = true } rand = { version = "0.8.2", optional = true }
serde_yaml = { version = "0.8", optional = true }
bytes = "1.0.1" bytes = "1.0.1"
matrix-sdk-common = { version = "0.2.0", path = "../matrix_sdk_common" } matrix-sdk-common = { version = "0.2.0", path = "../matrix_sdk_common" }
@ -93,6 +95,7 @@ tracing-subscriber = "0.2.15"
tempfile = "3.2.0" tempfile = "3.2.0"
mockito = "0.29.0" mockito = "0.29.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
matrix-sdk-common = { version = "0.2.0", path = "../matrix_sdk_common" }
[[example]] [[example]]
name = "emoji_verification" name = "emoji_verification"

View File

@ -405,6 +405,8 @@ pub struct RequestConfig {
pub(crate) retry_limit: Option<u64>, pub(crate) retry_limit: Option<u64>,
pub(crate) retry_timeout: Option<Duration>, pub(crate) retry_timeout: Option<Duration>,
pub(crate) force_auth: bool, pub(crate) force_auth: bool,
#[cfg(feature = "appservice")]
pub(crate) assert_identity: bool,
} }
#[cfg(not(tarpaulin_include))] #[cfg(not(tarpaulin_include))]
@ -426,6 +428,8 @@ impl Default for RequestConfig {
retry_limit: Default::default(), retry_limit: Default::default(),
retry_timeout: Default::default(), retry_timeout: Default::default(),
force_auth: false, force_auth: false,
#[cfg(feature = "appservice")]
assert_identity: false,
} }
} }
} }
@ -464,10 +468,23 @@ impl RequestConfig {
/// Force sending authorization even if the endpoint does not require it. Default is only /// Force sending authorization even if the endpoint does not require it. Default is only
/// sending authorization if it is required /// sending authorization if it is required
#[cfg(feature = "require_auth_for_profile_requests")] #[cfg(feature = "require_auth_for_profile_requests")]
#[cfg_attr(feature = "docs", doc(cfg(require_auth_for_profile_requests)))]
pub(crate) fn force_auth(mut self) -> Self { pub(crate) fn force_auth(mut self) -> Self {
self.force_auth = true; self.force_auth = true;
self self
} }
/// All outgoing http requests will have a GET query key-value appended with `user_id` being
/// the key and the `user_id` from the `Session` being the value. Will error if there's no
/// `Session`. This is called [identity assertion] in the Matrix Appservice Spec
///
/// [identity assertion]: https://spec.matrix.org/unstable/application-service-api/#identity-assertion
#[cfg(feature = "appservice")]
#[cfg_attr(feature = "docs", doc(cfg(appservice)))]
pub fn assert_identity(mut self) -> Self {
self.assert_identity = true;
self
}
} }
impl Client { impl Client {
@ -521,6 +538,31 @@ impl Client {
}) })
} }
/// Process a [transaction] received from the homeserver
///
/// # Arguments
///
/// * `incoming_transaction` - The incoming transaction received from the homeserver.
///
/// [transaction]: https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
#[cfg(feature = "appservice")]
#[cfg_attr(feature = "docs", doc(cfg(appservice)))]
pub async fn receive_transaction(
&self,
incoming_transaction: crate::api_appservice::event::push_events::v1::IncomingRequest,
) -> Result<()> {
let txn_id = incoming_transaction.txn_id.clone();
let response = incoming_transaction.try_into_sync_response(txn_id)?;
let base_client = self.base_client.clone();
let sync_response = base_client.receive_sync_response(response).await?;
if let Some(handler) = self.event_handler.read().await.as_ref() {
handler.handle_sync(&sync_response).await;
}
Ok(())
}
/// Is the client logged in. /// Is the client logged in.
pub async fn logged_in(&self) -> bool { pub async fn logged_in(&self) -> bool {
self.base_client.logged_in().await self.base_client.logged_in().await

View File

@ -82,6 +82,11 @@ pub enum HttpError {
/// The given request can't be cloned and thus can't be retried. /// The given request can't be cloned and thus can't be retried.
#[error("The request cannot be cloned")] #[error("The request cannot be cloned")]
UnableToCloneRequest, UnableToCloneRequest,
/// Tried to send a request without `user_id` in the `Session`
#[error("missing user_id in session")]
#[cfg(feature = "appservice")]
UserIdRequired,
} }
/// Internal representation of errors. /// Internal representation of errors.

View File

@ -100,6 +100,9 @@ pub(crate) struct HttpClient {
pub(crate) request_config: RequestConfig, pub(crate) request_config: RequestConfig,
} }
#[cfg(feature = "appservice")]
use crate::OutgoingRequestAppserviceExt;
impl HttpClient { impl HttpClient {
async fn send_request<Request: OutgoingRequest>( async fn send_request<Request: OutgoingRequest>(
&self, &self,
@ -112,7 +115,26 @@ impl HttpClient {
None => self.request_config, None => self.request_config,
}; };
let request = { #[cfg(not(feature = "appservice"))]
let request = self.try_into_http_request(request, session, config).await?;
#[cfg(feature = "appservice")]
let request = if !self.request_config.assert_identity {
self.try_into_http_request(request, session, config).await?
} else {
self.try_into_http_request_with_identy_assertion(request, session, config)
.await?
};
self.inner.send_request(request, config).await
}
async fn try_into_http_request<Request: OutgoingRequest>(
&self,
request: Request,
session: Arc<RwLock<Option<Session>>>,
config: RequestConfig,
) -> Result<http::Request<Bytes>, HttpError> {
let read_guard; let read_guard;
let access_token = if config.force_auth { let access_token = if config.force_auth {
read_guard = session.read().await; read_guard = session.read().await;
@ -137,12 +159,42 @@ impl HttpClient {
} }
}; };
request let http_request = request
.try_into_http_request::<BytesMut>(&self.homeserver.to_string(), access_token)? .try_into_http_request::<BytesMut>(&self.homeserver.to_string(), access_token)?
.map(|body| body.freeze()) .map(|body| body.freeze());
Ok(http_request)
}
#[cfg(feature = "appservice")]
async fn try_into_http_request_with_identy_assertion<Request: OutgoingRequest>(
&self,
request: Request,
session: Arc<RwLock<Option<Session>>>,
_: RequestConfig,
) -> Result<http::Request<Bytes>, HttpError> {
let read_guard = session.read().await;
let access_token = if let Some(session) = read_guard.as_ref() {
SendAccessToken::Always(session.access_token.as_str())
} else {
return Err(HttpError::AuthenticationRequired);
}; };
self.inner.send_request(request, config).await let user_id = if let Some(session) = read_guard.as_ref() {
session.user_id.clone()
} else {
return Err(HttpError::UserIdRequired);
};
let http_request = request
.try_into_http_request_with_user_id::<BytesMut>(
&self.homeserver.to_string(),
access_token,
user_id,
)?
.map(|body| body.freeze());
Ok(http_request)
} }
pub async fn upload( pub async fn upload(

View File

@ -47,6 +47,8 @@
//! * `require_auth_for_profile_requests`: Whether to send the access token in the authentication //! * `require_auth_for_profile_requests`: Whether to send the access token in the authentication
//! header when calling endpoints that retrieve profile data. This matches the synapse //! header when calling endpoints that retrieve profile data. This matches the synapse
//! configuration `require_auth_for_profile_requests`. Enabled by default. //! configuration `require_auth_for_profile_requests`. Enabled by default.
//! * `appservice`: Enables low-level appservice functionality. For an high-level API there's the
//! `matrix-sdk-appservice` crate
#![deny( #![deny(
missing_debug_implementations, missing_debug_implementations,

View File

@ -0,0 +1,41 @@
[package]
authors = ["Johannes Becker <j.becker@famedly.com>"]
edition = "2018"
homepage = "https://github.com/matrix-org/matrix-rust-sdk"
keywords = ["matrix", "chat", "messaging", "ruma", "nio", "appservice"]
license = "Apache-2.0"
name = "matrix-sdk-appservice"
version = "0.1.0"
[features]
default = ["actix"]
actix = ["actix-rt", "actix-web"]
docs = []
[dependencies]
actix-rt = { version = "2", optional = true }
actix-web = { version = "4.0.0-beta.6", optional = true }
futures = "0.3"
futures-util = "0.3"
http = "0.2"
regex = "1"
serde_yaml = "0.8"
thiserror = "1.0"
tracing = "0.1"
url = "2"
matrix-sdk = { version = "0.2", path = "../matrix_sdk", default-features = false, features = ["appservice", "native-tls"] }
[dev-dependencies]
env_logger = "0.8"
mockito = "0.30"
serde_json = "1"
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "macros"] }
tracing-subscriber = "0.2"
matrix-sdk-test = { version = "0.2", path = "../matrix_sdk_test" }
[[example]]
name = "actix_autojoin"
required-features = ["actix"]

View File

@ -0,0 +1,80 @@
use std::{convert::TryFrom, env};
use actix_web::{App, HttpServer};
use matrix_sdk::{
async_trait,
events::{
room::member::{MemberEventContent, MembershipState},
SyncStateEvent,
},
identifiers::UserId,
room::Room,
EventHandler,
};
use matrix_sdk_appservice::{Appservice, AppserviceRegistration};
struct AppserviceEventHandler {
appservice: Appservice,
}
impl AppserviceEventHandler {
pub fn new(appservice: Appservice) -> Self {
Self { appservice }
}
}
#[async_trait]
impl EventHandler for AppserviceEventHandler {
async fn on_room_member(&self, room: Room, event: &SyncStateEvent<MemberEventContent>) {
if !self
.appservice
.user_id_is_in_namespace(&event.state_key)
.unwrap()
{
dbg!("not an appservice user");
return;
}
if let MembershipState::Invite = event.content.membership {
let user_id = UserId::try_from(event.state_key.clone()).unwrap();
let client = self
.appservice
.client_with_localpart(user_id.localpart())
.await
.unwrap();
client.join_room_by_id(room.room_id()).await.unwrap();
}
}
}
#[actix_web::main]
pub async fn main() -> std::io::Result<()> {
env::set_var(
"RUST_LOG",
"actix_web=debug,actix_server=info,matrix_sdk=debug",
);
tracing_subscriber::fmt::init();
let homeserver_url = "http://localhost:8008";
let server_name = "localhost";
let registration =
AppserviceRegistration::try_from_yaml_file("./tests/registration.yaml").unwrap();
let appservice = Appservice::new(homeserver_url, server_name, registration)
.await
.unwrap();
let event_handler = AppserviceEventHandler::new(appservice.clone());
appservice
.client()
.set_event_handler(Box::new(event_handler))
.await;
HttpServer::new(move || App::new().service(appservice.actix_service()))
.bind(("0.0.0.0", 8090))?
.run()
.await
}

View File

@ -0,0 +1,182 @@
// Copyright 2021 Famedly GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
convert::{TryFrom, TryInto},
pin::Pin,
};
use actix_web::{
dev::Payload,
error::PayloadError,
get,
http::PathAndQuery,
put,
web::{self, BytesMut, Data},
App, FromRequest, HttpRequest, HttpResponse, HttpServer,
};
use futures::Future;
use futures_util::{TryFutureExt, TryStreamExt};
use matrix_sdk::api_appservice as api;
pub use actix_web::Scope;
use crate::{error::Error, Appservice};
pub async fn run_server(
appservice: Appservice,
host: impl AsRef<str>,
port: impl Into<u16>,
) -> Result<(), Error> {
HttpServer::new(move || App::new().service(appservice.actix_service()))
.bind((host.as_ref(), port.into()))?
.run()
.await?;
Ok(())
}
pub fn get_scope() -> Scope {
gen_scope("/"). // handle legacy routes
service(gen_scope("/_matrix/app/v1"))
}
fn gen_scope(scope: &str) -> Scope {
web::scope(scope)
.service(push_transactions)
.service(query_user_id)
.service(query_room_alias)
}
#[tracing::instrument]
#[put("/transactions/{txn_id}")]
async fn push_transactions(
request: IncomingRequest<api::event::push_events::v1::IncomingRequest>,
appservice: Data<Appservice>,
) -> Result<HttpResponse, Error> {
if !appservice.hs_token_matches(request.access_token) {
return Ok(HttpResponse::Unauthorized().finish());
}
appservice
.client()
.receive_transaction(request.incoming)
.await
.unwrap();
Ok(HttpResponse::Ok().json("{}"))
}
#[tracing::instrument]
#[get("/users/{user_id}")]
async fn query_user_id(
request: IncomingRequest<api::query::query_user_id::v1::IncomingRequest>,
appservice: Data<Appservice>,
) -> Result<HttpResponse, Error> {
if !appservice.hs_token_matches(request.access_token) {
return Ok(HttpResponse::Unauthorized().finish());
}
Ok(HttpResponse::Ok().json("{}"))
}
#[tracing::instrument]
#[get("/rooms/{room_alias}")]
async fn query_room_alias(
request: IncomingRequest<api::query::query_room_alias::v1::IncomingRequest>,
appservice: Data<Appservice>,
) -> Result<HttpResponse, Error> {
if !appservice.hs_token_matches(request.access_token) {
return Ok(HttpResponse::Unauthorized().finish());
}
Ok(HttpResponse::Ok().json("{}"))
}
#[derive(Debug)]
pub struct IncomingRequest<T> {
access_token: String,
incoming: T,
}
impl<T: matrix_sdk::IncomingRequest> FromRequest for IncomingRequest<T> {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
type Config = ();
fn from_request(request: &HttpRequest, payload: &mut Payload) -> Self::Future {
let request = request.to_owned();
let payload = payload.take();
Box::pin(async move {
let uri = request.uri().to_owned();
let uri = if !uri.path().starts_with("/_matrix/app/v1") {
// rename legacy routes
let mut parts = uri.into_parts();
let path_and_query = match parts.path_and_query {
Some(path_and_query) => format!("/_matrix/app/v1{}", path_and_query),
None => "/_matrix/app/v1".to_owned(),
};
parts.path_and_query =
Some(PathAndQuery::try_from(path_and_query).map_err(http::Error::from)?);
parts.try_into().map_err(http::Error::from)?
} else {
uri
};
let mut builder = http::request::Builder::new()
.method(request.method())
.uri(uri);
let headers = builder
.headers_mut()
.ok_or(Error::UnknownHttpRequestBuilder)?;
for (key, value) in request.headers().iter() {
headers.append(key, value.to_owned());
}
let bytes = payload
.try_fold(BytesMut::new(), |mut body, chunk| async move {
body.extend_from_slice(&chunk);
Ok::<_, PayloadError>(body)
})
.and_then(|bytes| async move { Ok::<Vec<u8>, _>(bytes.into_iter().collect()) })
.await?;
let access_token = match request.uri().query() {
Some(query) => {
let query: Vec<(String, String)> = matrix_sdk::urlencoded::from_str(query)?;
query
.into_iter()
.find(|(key, _)| key == "access_token")
.map(|(_, value)| value)
}
None => None,
};
let access_token = match access_token {
Some(access_token) => access_token,
None => return Err(Error::MissingAccessToken),
};
let request = builder.body(bytes)?;
Ok(IncomingRequest {
access_token,
incoming: matrix_sdk::IncomingRequest::try_from_http_request(request)?,
})
})
}
}

View File

@ -0,0 +1,74 @@
// Copyright 2021 Famedly GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("tried to run without webserver configured")]
RunWithoutServer,
#[error("missing access token")]
MissingAccessToken,
#[error("missing host on registration url")]
MissingRegistrationHost,
#[error("http request builder error")]
UnknownHttpRequestBuilder,
#[error("no port found")]
MissingRegistrationPort,
#[error(transparent)]
HttpRequest(#[from] matrix_sdk::FromHttpRequestError),
#[error(transparent)]
Identifier(#[from] matrix_sdk::identifiers::Error),
#[error(transparent)]
Http(#[from] http::Error),
#[error(transparent)]
Url(#[from] url::ParseError),
#[error(transparent)]
Serde(#[from] matrix_sdk::SerdeError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
InvalidUri(#[from] http::uri::InvalidUri),
#[error(transparent)]
Matrix(#[from] matrix_sdk::Error),
#[error(transparent)]
Regex(#[from] regex::Error),
#[error(transparent)]
SerdeYaml(#[from] serde_yaml::Error),
#[cfg(feature = "actix")]
#[error(transparent)]
Actix(#[from] actix_web::Error),
#[cfg(feature = "actix")]
#[error(transparent)]
ActixPayload(#[from] actix_web::error::PayloadError),
}
#[cfg(feature = "actix")]
impl actix_web::error::ResponseError for Error {}

View File

@ -0,0 +1,331 @@
// Copyright 2021 Famedly GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Matrix [Application Service] library
//!
//! The appservice crate aims to provide a batteries-included experience. That means that we
//! * ship with functionality to configure your webserver crate or simply run the webserver for you
//! * receive and validate requests from the homeserver correctly
//! * allow calling the homeserver with proper virtual user identity assertion
//! * have the goal to have a consistent room state available by leveraging the stores that the matrix-sdk provides
//!
//! # Quickstart
//!
//! ```no_run
//! # async {
//! use matrix_sdk_appservice::{Appservice, AppserviceRegistration};
//!
//! let homeserver_url = "http://127.0.0.1:8008";
//! let server_name = "localhost";
//! let registration = AppserviceRegistration::try_from_yaml_str(
//! r"
//! id: appservice
//! url: http://127.0.0.1:9009
//! as_token: as_token
//! hs_token: hs_token
//! sender_localpart: _appservice
//! namespaces:
//! users:
//! - exclusive: true
//! regex: '@_appservice_.*'
//! ")
//! .unwrap();
//!
//! let appservice = Appservice::new(homeserver_url, server_name, registration).await.unwrap();
//! // set event handler with `appservice.client().set_event_handler()` here
//! let (host, port) = appservice.get_host_and_port_from_registration().unwrap();
//! appservice.run(host, port).await.unwrap();
//! # };
//! ```
//!
//! [Application Service]: https://matrix.org/docs/spec/application_service/r0.1.2
#[cfg(not(any(feature = "actix",)))]
compile_error!("one webserver feature must be enabled. available ones: `actix`");
use std::{
convert::{TryFrom, TryInto},
fs::File,
ops::Deref,
path::PathBuf,
};
use http::Uri;
use matrix_sdk::{
api::{
error::ErrorKind,
r0::{
account::register::{LoginType, Request as RegistrationRequest},
uiaa::UiaaResponse,
},
},
api_appservice::Registration,
assign,
identifiers::{self, DeviceId, ServerNameBox, UserId},
reqwest::Url,
Client, ClientConfig, FromHttpResponseError, HttpError, RequestConfig, ServerError, Session,
};
use regex::Regex;
#[cfg(not(feature = "actix"))]
use tracing::error;
use tracing::warn;
#[doc(inline)]
pub use matrix_sdk::api_appservice as api;
#[cfg(feature = "actix")]
mod actix;
mod error;
pub use error::Error;
pub type Result<T> = std::result::Result<T, Error>;
pub type Host = String;
pub type Port = u16;
/// Appservice Registration
#[derive(Debug, Clone)]
pub struct AppserviceRegistration {
inner: Registration,
}
impl AppserviceRegistration {
/// Try to load registration from yaml string
///
/// See the fields of [`Registration`] for the required format
pub fn try_from_yaml_str(value: impl AsRef<str>) -> Result<Self> {
Ok(Self {
inner: serde_yaml::from_str(value.as_ref())?,
})
}
/// Try to load registration from yaml file
///
/// See the fields of [`Registration`] for the required format
pub fn try_from_yaml_file(path: impl Into<PathBuf>) -> Result<Self> {
let file = File::open(path.into())?;
Ok(Self {
inner: serde_yaml::from_reader(file)?,
})
}
}
impl From<Registration> for AppserviceRegistration {
fn from(value: Registration) -> Self {
Self { inner: value }
}
}
impl Deref for AppserviceRegistration {
type Target = Registration;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
async fn create_client(
homeserver_url: &Url,
server_name: &ServerNameBox,
registration: &AppserviceRegistration,
localpart: Option<&str>,
) -> Result<Client> {
let client = if localpart.is_some() {
let request_config = RequestConfig::default().assert_identity();
let config = ClientConfig::default().request_config(request_config);
Client::new_with_config(homeserver_url.clone(), config)?
} else {
Client::new(homeserver_url.clone())?
};
let session = Session {
access_token: registration.as_token.clone(),
user_id: UserId::parse_with_server_name(
localpart.unwrap_or(&registration.sender_localpart),
&server_name,
)?,
device_id: DeviceId::new(),
};
client.restore_login(session).await?;
Ok(client)
}
/// Appservice
#[derive(Debug, Clone)]
pub struct Appservice {
homeserver_url: Url,
server_name: ServerNameBox,
registration: AppserviceRegistration,
client_sender_localpart: Client,
}
impl Appservice {
/// Create new Appservice
///
/// # Arguments
///
/// * `homeserver_url` - The homeserver that the client should connect to.
/// * `server_name` - The server name to use when constructing user ids from the localpart.
/// * `registration` - The [Appservice Registration] to use when interacting with the homserver.
///
/// [Appservice Registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration
pub async fn new(
homeserver_url: impl TryInto<Url, Error = url::ParseError>,
server_name: impl TryInto<ServerNameBox, Error = identifiers::Error>,
registration: AppserviceRegistration,
) -> Result<Self> {
let homeserver_url = homeserver_url.try_into()?;
let server_name = server_name.try_into()?;
let client = create_client(&homeserver_url, &server_name, &registration, None).await?;
Ok(Appservice {
homeserver_url,
server_name,
registration,
client_sender_localpart: client,
})
}
/// Get `Client` for the user associated with the application service
/// (`sender_localpart` of the [registration])
///
/// [registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration
pub fn client(&self) -> Client {
self.client_sender_localpart.clone()
}
/// Get `Client` for the given `localpart`
///
/// If the `localpart` is covered by the `namespaces` in the [registration] all requests to the
/// homeserver will [assert the identity] to the according virtual user.
///
/// [registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration
/// [assert the identity]:
/// https://matrix.org/docs/spec/application_service/r0.1.2#identity-assertion
pub async fn client_with_localpart(
&self,
localpart: impl AsRef<str> + Into<Box<str>>,
) -> Result<Client> {
let user_id = UserId::parse_with_server_name(localpart, &self.server_name)?;
let localpart = user_id.localpart().to_owned();
let client = create_client(
&self.homeserver_url,
&self.server_name,
&self.registration,
Some(&localpart),
)
.await?;
self.ensure_registered(localpart).await?;
Ok(client)
}
async fn ensure_registered(&self, localpart: impl AsRef<str>) -> Result<()> {
let request = assign!(RegistrationRequest::new(), {
username: Some(localpart.as_ref()),
login_type: Some(&LoginType::ApplicationService),
});
match self.client().register(request).await {
Ok(_) => (),
Err(error) => match error {
matrix_sdk::Error::Http(HttpError::UiaaError(FromHttpResponseError::Http(
ServerError::Known(UiaaResponse::MatrixError(ref matrix_error)),
))) => {
match matrix_error.kind {
ErrorKind::UserInUse => {
// TODO: persist the fact that we registered that user
warn!("{}", matrix_error.message);
}
_ => return Err(error.into()),
}
}
_ => return Err(error.into()),
},
}
Ok(())
}
/// Get the Appservice [registration]
///
/// [registration]: https://matrix.org/docs/spec/application_service/r0.1.2#registration
pub fn registration(&self) -> &Registration {
&self.registration
}
/// Compare the given `hs_token` against `registration.hs_token`
///
/// Returns `true` if the tokens match, `false` otherwise.
pub fn hs_token_matches(&self, hs_token: impl AsRef<str>) -> bool {
self.registration.hs_token == hs_token.as_ref()
}
/// Check if given `user_id` is in any of the registration user namespaces
pub fn user_id_is_in_namespace(&self, user_id: impl AsRef<str>) -> Result<bool> {
for user in &self.registration.namespaces.users {
// TODO: precompile on Appservice construction
let re = Regex::new(&user.regex)?;
if re.is_match(user_id.as_ref()) {
return Ok(true);
}
}
Ok(false)
}
/// Get the host and port from the registration URL
///
/// If no port is found it falls back to scheme defaults: 80 for http and 443 for https
pub fn get_host_and_port_from_registration(&self) -> Result<(Host, Port)> {
let uri = Uri::try_from(&self.registration.url)?;
let host = uri.host().ok_or(Error::MissingRegistrationHost)?.to_owned();
let port = match uri.port() {
Some(port) => Ok(port.as_u16()),
None => match uri.scheme_str() {
Some("http") => Ok(80),
Some("https") => Ok(443),
_ => Err(Error::MissingRegistrationPort),
},
}?;
Ok((host, port))
}
/// Service to register on an Actix `App`
#[cfg(feature = "actix")]
#[cfg_attr(docs, doc(cfg(feature = "actix")))]
pub fn actix_service(&self) -> actix::Scope {
actix::get_scope().data(self.clone())
}
/// Convenience method that runs an http server depending on the selected server feature
///
/// This is a blocking call that tries to listen on the provided host and port
pub async fn run(&self, host: impl AsRef<str>, port: impl Into<u16>) -> Result<()> {
#[cfg(feature = "actix")]
{
actix::run_server(self.clone(), host, port).await?;
Ok(())
}
#[cfg(not(any(feature = "actix",)))]
unreachable!()
}
}

View File

@ -0,0 +1,115 @@
#[cfg(feature = "actix")]
mod actix {
use actix_web::{test, App};
use matrix_sdk_appservice::*;
use std::env;
async fn appservice() -> Appservice {
env::set_var(
"RUST_LOG",
"mockito=debug,matrix_sdk=debug,ruma=debug,actix_web=debug",
);
let _ = tracing_subscriber::fmt::try_init();
Appservice::new(
mockito::server_url().as_ref(),
"test.local",
AppserviceRegistration::try_from_yaml_file("./tests/registration.yaml").unwrap(),
)
.await
.unwrap()
}
#[actix_rt::test]
async fn test_transactions() {
let appservice = appservice().await;
let app = test::init_service(App::new().service(appservice.actix_service())).await;
let transactions = r#"{
"events": [
{
"content": {},
"type": "m.dummy"
}
]
}"#;
let transactions: serde_json::Value = serde_json::from_str(transactions).unwrap();
let req = test::TestRequest::put()
.uri("/_matrix/app/v1/transactions/1?access_token=hs_token")
.set_json(&transactions)
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
}
#[actix_rt::test]
async fn test_users() {
let appservice = appservice().await;
let app = test::init_service(App::new().service(appservice.actix_service())).await;
let req = test::TestRequest::get()
.uri("/_matrix/app/v1/users/%40_botty_1%3Adev.famedly.local?access_token=hs_token")
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), 200);
}
#[actix_rt::test]
async fn test_invalid_access_token() {
let appservice = appservice().await;
let app = test::init_service(App::new().service(appservice.actix_service())).await;
let transactions = r#"{
"events": [
{
"content": {},
"type": "m.dummy"
}
]
}"#;
let transactions: serde_json::Value = serde_json::from_str(transactions).unwrap();
let req = test::TestRequest::put()
.uri("/_matrix/app/v1/transactions/1?access_token=invalid_token")
.set_json(&transactions)
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), 401);
}
#[actix_rt::test]
async fn test_no_access_token() {
let appservice = appservice().await;
let app = test::init_service(App::new().service(appservice.actix_service())).await;
let transactions = r#"{
"events": [
{
"content": {},
"type": "m.dummy"
}
]
}"#;
let transactions: serde_json::Value = serde_json::from_str(transactions).unwrap();
let req = test::TestRequest::put()
.uri("/_matrix/app/v1/transactions/1")
.set_json(&transactions)
.to_request();
let resp = test::call_service(&app, req).await;
// TODO: this should actually return a 401 but is 500 because something in the extractor fails
assert_eq!(resp.status(), 500);
}
}

View File

@ -0,0 +1,13 @@
id: appservice
url: http://localhost:9009
as_token: as_token
hs_token: hs_token
sender_localpart: _appservice
namespaces:
aliases: []
rooms: []
users:
- exclusive: true
regex: '@_appservice_.*'
rate_limited: false
protocols: []

View File

@ -0,0 +1,157 @@
use std::env;
use matrix_sdk::{
api_appservice,
api_appservice::Registration,
async_trait,
events::{room::member::MemberEventContent, AnyEvent, AnyStateEvent, SyncStateEvent},
room::Room,
EventHandler, Raw,
};
use matrix_sdk_appservice::*;
use matrix_sdk_test::async_test;
use serde_json::json;
fn registration_string() -> String {
include_str!("../tests/registration.yaml").to_owned()
}
async fn appservice(registration: Option<Registration>) -> Result<Appservice> {
env::set_var("RUST_LOG", "mockito=debug,matrix_sdk=debug");
let _ = tracing_subscriber::fmt::try_init();
let registration = match registration {
Some(registration) => registration.into(),
None => AppserviceRegistration::try_from_yaml_str(registration_string()).unwrap(),
};
let homeserver_url = mockito::server_url();
let server_name = "localhost";
Ok(Appservice::new(homeserver_url.as_ref(), server_name, registration).await?)
}
fn member_json() -> serde_json::Value {
json!({
"content": {
"avatar_url": null,
"displayname": "example",
"membership": "join"
},
"event_id": "$151800140517rfvjc:localhost",
"membership": "join",
"origin_server_ts": 151800140,
"room_id": "!ahpSDaDUPCCqktjUEF:localhost",
"sender": "@example:localhost",
"state_key": "@example:localhost",
"type": "m.room.member",
"prev_content": {
"avatar_url": null,
"displayname": "example",
"membership": "invite"
},
"unsigned": {
"age": 297036,
"replaces_state": "$151800111315tsynI:localhost"
}
})
}
#[async_test]
async fn test_event_handler() -> Result<()> {
let appservice = appservice(None).await?;
struct Example {}
impl Example {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl EventHandler for Example {
async fn on_state_member(&self, room: Room, event: &SyncStateEvent<MemberEventContent>) {
dbg!(room, event);
}
}
appservice
.client()
.set_event_handler(Box::new(Example::new()))
.await;
let event = serde_json::from_value::<AnyStateEvent>(member_json()).unwrap();
let event: Raw<AnyEvent> = AnyEvent::State(event).into();
let events = vec![event];
let incoming = api_appservice::event::push_events::v1::IncomingRequest::new(
"any_txn_id".to_owned(),
events,
);
appservice.client().receive_transaction(incoming).await?;
Ok(())
}
#[async_test]
async fn test_transaction() -> Result<()> {
let appservice = appservice(None).await?;
let event = serde_json::from_value::<AnyStateEvent>(member_json()).unwrap();
let event: Raw<AnyEvent> = AnyEvent::State(event).into();
let events = vec![event];
let incoming = api_appservice::event::push_events::v1::IncomingRequest::new(
"any_txn_id".to_owned(),
events,
);
appservice.client().receive_transaction(incoming).await?;
Ok(())
}
#[async_test]
async fn test_verify_hs_token() -> Result<()> {
let appservice = appservice(None).await?;
let registration = appservice.registration();
assert!(appservice.hs_token_matches(&registration.hs_token));
Ok(())
}
mod registration {
use super::*;
#[test]
fn test_registration() -> Result<()> {
let registration: Registration = serde_yaml::from_str(&registration_string())?;
let registration: AppserviceRegistration = registration.into();
assert_eq!(registration.id, "appservice");
Ok(())
}
#[test]
fn test_registration_from_yaml_file() -> Result<()> {
let registration = AppserviceRegistration::try_from_yaml_file("./tests/registration.yaml")?;
assert_eq!(registration.id, "appservice");
Ok(())
}
#[test]
fn test_registration_from_yaml_str() -> Result<()> {
let registration = AppserviceRegistration::try_from_yaml_str(registration_string())?;
assert_eq!(registration.id, "appservice");
Ok(())
}
}

View File

@ -12,6 +12,7 @@ version = "0.2.0"
[features] [features]
markdown = ["ruma/markdown"] markdown = ["ruma/markdown"]
appservice = ["ruma/appservice-api", "ruma/appservice-api-helper", "ruma/rand"]
[dependencies] [dependencies]
instant = { version = "0.1.9", features = ["wasm-bindgen", "now"] } instant = { version = "0.1.9", features = ["wasm-bindgen", "now"] }

View File

@ -1,5 +1,10 @@
pub use async_trait::async_trait; pub use async_trait::async_trait;
pub use instant; pub use instant;
#[cfg(feature = "appservice")]
pub use ruma::{
api::{appservice as api_appservice, IncomingRequest, OutgoingRequestAppserviceExt},
serde::{exports::serde::de::value::Error as SerdeError, urlencoded},
};
pub use ruma::{ pub use ruma::{
api::{ api::{
client as api, client as api,