From 71500b14b902321e91cab432d55dd3f3ae7aedfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Tue, 15 Sep 2020 08:16:20 +0200 Subject: [PATCH] fix: send to all servers and fix media store --- src/client_server/alias.rs | 2 +- src/client_server/directory.rs | 2 +- src/client_server/media.rs | 38 ++++++++-------------------- src/client_server/membership.rs | 4 +-- src/database/rooms.rs | 44 ++++++++++++++++++++++----------- src/server_server.rs | 29 ++++++++++++---------- 6 files changed, 60 insertions(+), 59 deletions(-) diff --git a/src/client_server/alias.rs b/src/client_server/alias.rs index c5c514e..c2c3eb9 100644 --- a/src/client_server/alias.rs +++ b/src/client_server/alias.rs @@ -63,7 +63,7 @@ pub async fn get_alias_helper( if room_alias.server_name() != db.globals.server_name() { let response = server_server::send_request( &db.globals, - room_alias.server_name(), + room_alias.server_name().to_owned(), federation::query::get_room_information::v1::Request { room_alias }, ) .await?; diff --git a/src/client_server/directory.rs b/src/client_server/directory.rs index 372ce98..c82a15f 100644 --- a/src/client_server/directory.rs +++ b/src/client_server/directory.rs @@ -122,7 +122,7 @@ pub async fn get_public_rooms_filtered_helper( { let response = server_server::send_request( &db.globals, - other_server, + other_server.to_owned(), federation::directory::get_public_rooms_filtered::v1::Request { limit, since: since.as_deref(), diff --git a/src/client_server/media.rs b/src/client_server/media.rs index 8f7a9b9..8a93d49 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -48,22 +48,13 @@ pub fn create_content_route( #[cfg_attr( feature = "conduit_bin", - get( - "/_matrix/media/r0/download/<_server_name>/<_media_id>", - data = "" - ) + get("/_matrix/media/r0/download/<_>/<_>", data = "") )] pub async fn get_content_route( db: State<'_, Database>, body: Ruma>, - _server_name: String, - _media_id: String, ) -> ConduitResult { - let mxc = format!( - "mxc://{}/{}", - db.globals.server_name(), - utils::random_string(MXC_LENGTH) - ); + let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); if let Some(FileMeta { filename, @@ -77,10 +68,10 @@ pub async fn get_content_route( content_disposition: filename.unwrap_or_default(), // TODO: Spec says this should be optional } .into()) - } else if body.allow_remote { + } else if &*body.server_name != db.globals.server_name() && body.allow_remote { let get_content_response = server_server::send_request( &db.globals, - body.server_name.as_ref(), + body.server_name.clone(), get_content::Request { allow_remote: false, server_name: &body.server_name, @@ -104,21 +95,18 @@ pub async fn get_content_route( #[cfg_attr( feature = "conduit_bin", - get( - "/_matrix/media/r0/thumbnail/<_server_name>/<_media_id>", - data = "" - ) + get("/_matrix/media/r0/thumbnail/<_>/<_>", data = "") )] pub async fn get_content_thumbnail_route( db: State<'_, Database>, body: Ruma>, - _server_name: String, - _media_id: String, ) -> ConduitResult { + let mxc = format!("mxc://{}/{}", body.server_name, body.media_id); + if let Some(FileMeta { content_type, file, .. }) = db.media.get_thumbnail( - format!("mxc://{}/{}", body.server_name, body.media_id), + mxc.clone(), body.width .try_into() .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, @@ -127,10 +115,10 @@ pub async fn get_content_thumbnail_route( .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, )? { Ok(get_content_thumbnail::Response { file, content_type }.into()) - } else if body.allow_remote { + } else if &*body.server_name != db.globals.server_name() && body.allow_remote { let get_thumbnail_response = server_server::send_request( &db.globals, - body.server_name.as_ref(), + body.server_name.clone(), get_content_thumbnail::Request { allow_remote: false, height: body.height, @@ -142,12 +130,6 @@ pub async fn get_content_thumbnail_route( ) .await?; - let mxc = format!( - "mxc://{}/{}", - db.globals.server_name(), - utils::random_string(MXC_LENGTH) - ); - db.media.upload_thumbnail( mxc, &None, diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 18fb5a9..f60601f 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -440,7 +440,7 @@ async fn join_room_by_id_helper( for remote_server in servers { let make_join_response = server_server::send_request( &db.globals, - remote_server, + remote_server.clone(), federation::membership::create_join_event_template::v1::Request { room_id, user_id: sender_id, @@ -501,7 +501,7 @@ async fn join_room_by_id_helper( let send_join_response = server_server::send_request( &db.globals, - remote_server, + remote_server.clone(), federation::membership::create_join_event::v2::Request { room_id, event_id: &event_id, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index ba54e7f..3c3a0b2 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1,9 +1,10 @@ mod edus; pub use edus::RoomEdus; +use rocket::futures; use crate::{pdu::PduBuilder, server_server, utils, Error, PduEvent, Result}; -use log::error; +use log::{error, warn}; use ring::digest; use ruma::{ api::client::error::ErrorKind, @@ -833,20 +834,35 @@ impl Rooms { .expect("json is object") .remove("event_id"); - let response = server_server::send_request( - &globals, - "koesters.xyz".try_into().unwrap(), - federation::transactions::send_transaction_message::v1::Request { - origin: globals.server_name(), - pdus: &[serde_json::from_value(pdu_json).expect("Raw::from_value always works")], - edus: &[], - origin_server_ts: SystemTime::now(), - transaction_id: &utils::random_string(16), - }, - ) - .await; + let raw_json = + serde_json::from_value::>(pdu_json).expect("Raw::from_value always works"); - let _ = dbg!(response); + let pdus = &[raw_json]; + let transaction_id = utils::random_string(16); + + for result in futures::future::join_all( + self.room_servers(room_id) + .filter_map(|r| r.ok()) + .filter(|server| &**server != globals.server_name()) + .map(|server| { + server_server::send_request( + &globals, + server, + federation::transactions::send_transaction_message::v1::Request { + origin: globals.server_name(), + pdus, + edus: &[], + origin_server_ts: SystemTime::now(), + transaction_id: &transaction_id, + }, + ) + }), + ) + .await { + if let Err(e) = result { + warn!("{}", e); + } + } Ok(pdu.event_id) } diff --git a/src/server_server.rs b/src/server_server.rs index 9f4be13..da5a6c1 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,5 +1,6 @@ use crate::{client_server, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use http::header::{HeaderValue, AUTHORIZATION}; +use log::warn; use rocket::{get, post, put, response::content::Json, State}; use ruma::{ api::federation::directory::get_public_rooms_filtered, @@ -24,7 +25,10 @@ use std::{ time::{Duration, SystemTime}, }; -pub async fn request_well_known(globals: &crate::database::globals::Globals, destination: &str) -> Option { +pub async fn request_well_known( + globals: &crate::database::globals::Globals, + destination: &str, +) -> Option { let body: serde_json::Value = serde_json::from_str( &globals .reqwest_client() @@ -45,7 +49,7 @@ pub async fn request_well_known(globals: &crate::database::globals::Globals, des pub async fn send_request( globals: &crate::database::globals::Globals, - destination: &ServerName, + destination: Box, request: T, ) -> Result where @@ -79,10 +83,7 @@ where .to_string() .into(), ); - request_map.insert( - "origin".to_owned(), - globals.server_name().as_str().into(), - ); + request_map.insert("origin".to_owned(), globals.server_name().as_str().into()); request_map.insert("destination".to_owned(), destination.as_str().into()); let mut request_json = request_map.into(); @@ -144,10 +145,11 @@ where .into_iter() .collect(); - Ok( - T::IncomingResponse::try_from(http_response.body(body).unwrap()) - .expect("TODO: error handle other server errors"), - ) + let response = T::IncomingResponse::try_from(http_response.body(body).unwrap()); + response.map_err(|e| { + warn!("{}", e); + Error::BadServerResponse("Server returned bad response.") + }) } Err(e) => Err(e.into()), } @@ -316,10 +318,11 @@ pub fn send_transaction_message_route<'a>( .expect("ruma pdus are json objects") .insert("event_id".to_owned(), event_id.to_string().into()); - let pdu = - serde_json::from_value::(value.clone()).expect("all ruma pdus are conduit pdus"); + let pdu = serde_json::from_value::(value.clone()) + .expect("all ruma pdus are conduit pdus"); if db.rooms.exists(&pdu.room_id)? { - db.rooms.append_pdu(&pdu, &value, &db.globals, &db.account_data)?; + db.rooms + .append_pdu(&pdu, &value, &db.globals, &db.account_data)?; } } Ok(send_transaction_message::v1::Response {