fix: send to all servers and fix media store

next
Timo Kösters 2020-09-15 08:16:20 +02:00
parent f7816b11de
commit 71500b14b9
No known key found for this signature in database
GPG Key ID: 356E705610F626D5
6 changed files with 60 additions and 59 deletions

View File

@ -63,7 +63,7 @@ pub async fn get_alias_helper(
if room_alias.server_name() != db.globals.server_name() { if room_alias.server_name() != db.globals.server_name() {
let response = server_server::send_request( let response = server_server::send_request(
&db.globals, &db.globals,
room_alias.server_name(), room_alias.server_name().to_owned(),
federation::query::get_room_information::v1::Request { room_alias }, federation::query::get_room_information::v1::Request { room_alias },
) )
.await?; .await?;

View File

@ -122,7 +122,7 @@ pub async fn get_public_rooms_filtered_helper(
{ {
let response = server_server::send_request( let response = server_server::send_request(
&db.globals, &db.globals,
other_server, other_server.to_owned(),
federation::directory::get_public_rooms_filtered::v1::Request { federation::directory::get_public_rooms_filtered::v1::Request {
limit, limit,
since: since.as_deref(), since: since.as_deref(),

View File

@ -48,22 +48,13 @@ pub fn create_content_route(
#[cfg_attr( #[cfg_attr(
feature = "conduit_bin", feature = "conduit_bin",
get( get("/_matrix/media/r0/download/<_>/<_>", data = "<body>")
"/_matrix/media/r0/download/<_server_name>/<_media_id>",
data = "<body>"
)
)] )]
pub async fn get_content_route( pub async fn get_content_route(
db: State<'_, Database>, db: State<'_, Database>,
body: Ruma<get_content::Request<'_>>, body: Ruma<get_content::Request<'_>>,
_server_name: String,
_media_id: String,
) -> ConduitResult<get_content::Response> { ) -> ConduitResult<get_content::Response> {
let mxc = format!( let mxc = format!("mxc://{}/{}", body.server_name, body.media_id);
"mxc://{}/{}",
db.globals.server_name(),
utils::random_string(MXC_LENGTH)
);
if let Some(FileMeta { if let Some(FileMeta {
filename, filename,
@ -77,10 +68,10 @@ pub async fn get_content_route(
content_disposition: filename.unwrap_or_default(), // TODO: Spec says this should be optional content_disposition: filename.unwrap_or_default(), // TODO: Spec says this should be optional
} }
.into()) .into())
} else if body.allow_remote { } else if &*body.server_name != db.globals.server_name() && body.allow_remote {
let get_content_response = server_server::send_request( let get_content_response = server_server::send_request(
&db.globals, &db.globals,
body.server_name.as_ref(), body.server_name.clone(),
get_content::Request { get_content::Request {
allow_remote: false, allow_remote: false,
server_name: &body.server_name, server_name: &body.server_name,
@ -104,21 +95,18 @@ pub async fn get_content_route(
#[cfg_attr( #[cfg_attr(
feature = "conduit_bin", feature = "conduit_bin",
get( get("/_matrix/media/r0/thumbnail/<_>/<_>", data = "<body>")
"/_matrix/media/r0/thumbnail/<_server_name>/<_media_id>",
data = "<body>"
)
)] )]
pub async fn get_content_thumbnail_route( pub async fn get_content_thumbnail_route(
db: State<'_, Database>, db: State<'_, Database>,
body: Ruma<get_content_thumbnail::Request<'_>>, body: Ruma<get_content_thumbnail::Request<'_>>,
_server_name: String,
_media_id: String,
) -> ConduitResult<get_content_thumbnail::Response> { ) -> ConduitResult<get_content_thumbnail::Response> {
let mxc = format!("mxc://{}/{}", body.server_name, body.media_id);
if let Some(FileMeta { if let Some(FileMeta {
content_type, file, .. content_type, file, ..
}) = db.media.get_thumbnail( }) = db.media.get_thumbnail(
format!("mxc://{}/{}", body.server_name, body.media_id), mxc.clone(),
body.width body.width
.try_into() .try_into()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?,
@ -127,10 +115,10 @@ pub async fn get_content_thumbnail_route(
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?, .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Width is invalid."))?,
)? { )? {
Ok(get_content_thumbnail::Response { file, content_type }.into()) Ok(get_content_thumbnail::Response { file, content_type }.into())
} else if body.allow_remote { } else if &*body.server_name != db.globals.server_name() && body.allow_remote {
let get_thumbnail_response = server_server::send_request( let get_thumbnail_response = server_server::send_request(
&db.globals, &db.globals,
body.server_name.as_ref(), body.server_name.clone(),
get_content_thumbnail::Request { get_content_thumbnail::Request {
allow_remote: false, allow_remote: false,
height: body.height, height: body.height,
@ -142,12 +130,6 @@ pub async fn get_content_thumbnail_route(
) )
.await?; .await?;
let mxc = format!(
"mxc://{}/{}",
db.globals.server_name(),
utils::random_string(MXC_LENGTH)
);
db.media.upload_thumbnail( db.media.upload_thumbnail(
mxc, mxc,
&None, &None,

View File

@ -440,7 +440,7 @@ async fn join_room_by_id_helper(
for remote_server in servers { for remote_server in servers {
let make_join_response = server_server::send_request( let make_join_response = server_server::send_request(
&db.globals, &db.globals,
remote_server, remote_server.clone(),
federation::membership::create_join_event_template::v1::Request { federation::membership::create_join_event_template::v1::Request {
room_id, room_id,
user_id: sender_id, user_id: sender_id,
@ -501,7 +501,7 @@ async fn join_room_by_id_helper(
let send_join_response = server_server::send_request( let send_join_response = server_server::send_request(
&db.globals, &db.globals,
remote_server, remote_server.clone(),
federation::membership::create_join_event::v2::Request { federation::membership::create_join_event::v2::Request {
room_id, room_id,
event_id: &event_id, event_id: &event_id,

View File

@ -1,9 +1,10 @@
mod edus; mod edus;
pub use edus::RoomEdus; pub use edus::RoomEdus;
use rocket::futures;
use crate::{pdu::PduBuilder, server_server, utils, Error, PduEvent, Result}; use crate::{pdu::PduBuilder, server_server, utils, Error, PduEvent, Result};
use log::error; use log::{error, warn};
use ring::digest; use ring::digest;
use ruma::{ use ruma::{
api::client::error::ErrorKind, api::client::error::ErrorKind,
@ -833,20 +834,35 @@ impl Rooms {
.expect("json is object") .expect("json is object")
.remove("event_id"); .remove("event_id");
let response = server_server::send_request( let raw_json =
serde_json::from_value::<Raw<_>>(pdu_json).expect("Raw::from_value always works");
let pdus = &[raw_json];
let transaction_id = utils::random_string(16);
for result in futures::future::join_all(
self.room_servers(room_id)
.filter_map(|r| r.ok())
.filter(|server| &**server != globals.server_name())
.map(|server| {
server_server::send_request(
&globals, &globals,
"koesters.xyz".try_into().unwrap(), server,
federation::transactions::send_transaction_message::v1::Request { federation::transactions::send_transaction_message::v1::Request {
origin: globals.server_name(), origin: globals.server_name(),
pdus: &[serde_json::from_value(pdu_json).expect("Raw::from_value always works")], pdus,
edus: &[], edus: &[],
origin_server_ts: SystemTime::now(), origin_server_ts: SystemTime::now(),
transaction_id: &utils::random_string(16), transaction_id: &transaction_id,
}, },
) )
.await; }),
)
let _ = dbg!(response); .await {
if let Err(e) = result {
warn!("{}", e);
}
}
Ok(pdu.event_id) Ok(pdu.event_id)
} }

View File

@ -1,5 +1,6 @@
use crate::{client_server, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use crate::{client_server, ConduitResult, Database, Error, PduEvent, Result, Ruma};
use http::header::{HeaderValue, AUTHORIZATION}; use http::header::{HeaderValue, AUTHORIZATION};
use log::warn;
use rocket::{get, post, put, response::content::Json, State}; use rocket::{get, post, put, response::content::Json, State};
use ruma::{ use ruma::{
api::federation::directory::get_public_rooms_filtered, api::federation::directory::get_public_rooms_filtered,
@ -24,7 +25,10 @@ use std::{
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
pub async fn request_well_known(globals: &crate::database::globals::Globals, destination: &str) -> Option<String> { pub async fn request_well_known(
globals: &crate::database::globals::Globals,
destination: &str,
) -> Option<String> {
let body: serde_json::Value = serde_json::from_str( let body: serde_json::Value = serde_json::from_str(
&globals &globals
.reqwest_client() .reqwest_client()
@ -45,7 +49,7 @@ pub async fn request_well_known(globals: &crate::database::globals::Globals, des
pub async fn send_request<T: OutgoingRequest>( pub async fn send_request<T: OutgoingRequest>(
globals: &crate::database::globals::Globals, globals: &crate::database::globals::Globals,
destination: &ServerName, destination: Box<ServerName>,
request: T, request: T,
) -> Result<T::IncomingResponse> ) -> Result<T::IncomingResponse>
where where
@ -79,10 +83,7 @@ where
.to_string() .to_string()
.into(), .into(),
); );
request_map.insert( request_map.insert("origin".to_owned(), globals.server_name().as_str().into());
"origin".to_owned(),
globals.server_name().as_str().into(),
);
request_map.insert("destination".to_owned(), destination.as_str().into()); request_map.insert("destination".to_owned(), destination.as_str().into());
let mut request_json = request_map.into(); let mut request_json = request_map.into();
@ -144,10 +145,11 @@ where
.into_iter() .into_iter()
.collect(); .collect();
Ok( let response = T::IncomingResponse::try_from(http_response.body(body).unwrap());
T::IncomingResponse::try_from(http_response.body(body).unwrap()) response.map_err(|e| {
.expect("TODO: error handle other server errors"), warn!("{}", e);
) Error::BadServerResponse("Server returned bad response.")
})
} }
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
} }
@ -316,10 +318,11 @@ pub fn send_transaction_message_route<'a>(
.expect("ruma pdus are json objects") .expect("ruma pdus are json objects")
.insert("event_id".to_owned(), event_id.to_string().into()); .insert("event_id".to_owned(), event_id.to_string().into());
let pdu = let pdu = serde_json::from_value::<PduEvent>(value.clone())
serde_json::from_value::<PduEvent>(value.clone()).expect("all ruma pdus are conduit pdus"); .expect("all ruma pdus are conduit pdus");
if db.rooms.exists(&pdu.room_id)? { if db.rooms.exists(&pdu.room_id)? {
db.rooms.append_pdu(&pdu, &value, &db.globals, &db.account_data)?; db.rooms
.append_pdu(&pdu, &value, &db.globals, &db.account_data)?;
} }
} }
Ok(send_transaction_message::v1::Response { Ok(send_transaction_message::v1::Response {