From f7816b11de0889fca761f55510a3313dcfa78a42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 14 Sep 2020 20:23:19 +0200 Subject: [PATCH] feat: send messages over federation --- Cargo.lock | 32 ++--- src/client_server/account.rs | 4 +- src/client_server/alias.rs | 2 +- src/client_server/directory.rs | 2 +- src/client_server/media.rs | 4 +- src/client_server/membership.rs | 206 +++++++++++++++++--------------- src/client_server/message.rs | 4 +- src/client_server/profile.rs | 8 +- src/client_server/redact.rs | 4 +- src/client_server/room.rs | 37 +++--- src/client_server/state.rs | 12 +- src/database.rs | 1 + src/database/rooms.rs | 177 ++++++++++++++++++--------- src/pdu.rs | 27 ++++- src/server_server.rs | 22 ++-- 15 files changed, 324 insertions(+), 218 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bde0b0d..e0de2a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1414,7 +1414,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.0.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "ruma-api", "ruma-appservice-api", @@ -1430,7 +1430,7 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.17.0-alpha.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "http", "percent-encoding", @@ -1445,7 +1445,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.17.0-alpha.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1456,7 +1456,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.2.0-alpha.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "ruma-api", "ruma-common", @@ -1469,7 +1469,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.10.0-alpha.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "assign", "http", @@ -1488,7 +1488,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.2.0" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "js_int", "ruma-api", @@ -1502,7 +1502,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.22.0-alpha.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "js_int", "ruma-common", @@ -1517,7 +1517,7 @@ dependencies = [ [[package]] name = "ruma-events-macros" version = "0.22.0-alpha.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1528,7 +1528,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.0.3" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "js_int", "ruma-api", @@ -1543,7 +1543,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.17.4" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "rand", "ruma-identifiers-macros", @@ -1555,7 +1555,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.17.4" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "proc-macro2", "quote", @@ -1566,7 +1566,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.1.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "serde", "strum", @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.2.3" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "form_urlencoded", "itoa", @@ -1587,7 +1587,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.6.0-dev.1" -source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#088382dbdc176e61fa5bde679ae38093865e7053" +source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#6ccb3ecaf69167ba405379826a9d87a98f168df8" dependencies = [ "base64", "ring", @@ -1916,9 +1916,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.40" +version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963f7d3cc59b59b9325165add223142bbf1df27655d07789f109896d353d8350" +checksum = "6690e3e9f692504b941dc6c3b188fd28df054f7fb8469ab40680df52fdcc842b" dependencies = [ "proc-macro2", "quote", diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 3db933c..2ec9282 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -303,7 +303,7 @@ pub fn whoami_route(body: Ruma) -> ConduitResult, body: Ruma>, ) -> ConduitResult { @@ -366,7 +366,7 @@ pub fn deactivate_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; } // Remove devices and mark account as deactivated diff --git a/src/client_server/alias.rs b/src/client_server/alias.rs index 0ec43f5..c5c514e 100644 --- a/src/client_server/alias.rs +++ b/src/client_server/alias.rs @@ -62,7 +62,7 @@ pub async fn get_alias_helper( ) -> ConduitResult { if room_alias.server_name() != db.globals.server_name() { let response = server_server::send_request( - &db, + &db.globals, room_alias.server_name(), federation::query::get_room_information::v1::Request { room_alias }, ) diff --git a/src/client_server/directory.rs b/src/client_server/directory.rs index 871a780..372ce98 100644 --- a/src/client_server/directory.rs +++ b/src/client_server/directory.rs @@ -121,7 +121,7 @@ pub async fn get_public_rooms_filtered_helper( .filter(|server| *server != db.globals.server_name().as_str()) { let response = server_server::send_request( - &db, + &db.globals, other_server, federation::directory::get_public_rooms_filtered::v1::Request { limit, diff --git a/src/client_server/media.rs b/src/client_server/media.rs index f897a67..8f7a9b9 100644 --- a/src/client_server/media.rs +++ b/src/client_server/media.rs @@ -79,7 +79,7 @@ pub async fn get_content_route( .into()) } else if body.allow_remote { let get_content_response = server_server::send_request( - &db, + &db.globals, body.server_name.as_ref(), get_content::Request { allow_remote: false, @@ -129,7 +129,7 @@ pub async fn get_content_thumbnail_route( Ok(get_content_thumbnail::Response { file, content_type }.into()) } else if body.allow_remote { let get_thumbnail_response = server_server::send_request( - &db, + &db.globals, body.server_name.as_ref(), get_content_thumbnail::Request { allow_remote: false, diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 8d19402..18fb5a9 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -83,7 +83,7 @@ pub async fn join_room_by_id_or_alias_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/leave", data = "") )] -pub fn leave_room_route( +pub async fn leave_room_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -108,19 +108,21 @@ pub fn leave_room_route( event.membership = member::MembershipState::Leave; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_id.to_string()), - redacts: None, - }, - &sender_id, - &body.room_id, - &db.globals, - &db.account_data, - )?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_id.to_string()), + redacts: None, + }, + &sender_id, + &body.room_id, + &db.globals, + &db.account_data, + ) + .await?; Ok(leave_room::Response::new().into()) } @@ -129,33 +131,35 @@ pub fn leave_room_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/invite", data = "") )] -pub fn invite_user_route( +pub async fn invite_user_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { let sender_id = body.sender_id.as_ref().expect("user is authenticated"); if let invite_user::IncomingInvitationRecipient::UserId { user_id } = &body.recipient { - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(member::MemberEventContent { - membership: member::MembershipState::Invite, - displayname: db.users.displayname(&user_id)?, - avatar_url: db.users.avatar_url(&user_id)?, - is_direct: None, - third_party_invite: None, - }) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - &sender_id, - &body.room_id, - &db.globals, - &db.account_data, - )?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(member::MemberEventContent { + membership: member::MembershipState::Invite, + displayname: db.users.displayname(&user_id)?, + avatar_url: db.users.avatar_url(&user_id)?, + is_direct: None, + third_party_invite: None, + }) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(user_id.to_string()), + redacts: None, + }, + &sender_id, + &body.room_id, + &db.globals, + &db.account_data, + ) + .await?; Ok(invite_user::Response.into()) } else { @@ -167,7 +171,7 @@ pub fn invite_user_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/kick", data = "") )] -pub fn kick_user_route( +pub async fn kick_user_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -193,19 +197,21 @@ pub fn kick_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; // TODO: reason - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - &sender_id, - &body.room_id, - &db.globals, - &db.account_data, - )?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + &sender_id, + &body.room_id, + &db.globals, + &db.account_data, + ) + .await?; Ok(kick_user::Response::new().into()) } @@ -214,7 +220,7 @@ pub fn kick_user_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/ban", data = "") )] -pub fn ban_user_route( +pub async fn ban_user_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -248,19 +254,21 @@ pub fn ban_user_route( }, )?; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - &sender_id, - &body.room_id, - &db.globals, - &db.account_data, - )?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + &sender_id, + &body.room_id, + &db.globals, + &db.account_data, + ) + .await?; Ok(ban_user::Response::new().into()) } @@ -269,7 +277,7 @@ pub fn ban_user_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_>/unban", data = "") )] -pub fn unban_user_route( +pub async fn unban_user_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -294,19 +302,21 @@ pub fn unban_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(body.user_id.to_string()), - redacts: None, - }, - &sender_id, - &body.room_id, - &db.globals, - &db.account_data, - )?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(event).expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(body.user_id.to_string()), + redacts: None, + }, + &sender_id, + &body.room_id, + &db.globals, + &db.account_data, + ) + .await?; Ok(unban_user::Response::new().into()) } @@ -429,7 +439,7 @@ async fn join_room_by_id_helper( for remote_server in servers { let make_join_response = server_server::send_request( - &db, + &db.globals, remote_server, federation::membership::create_join_event_template::v1::Request { room_id, @@ -490,7 +500,7 @@ async fn join_room_by_id_helper( .expect("event is valid, we just created it"); let send_join_response = server_server::send_request( - &db, + &db.globals, remote_server, federation::membership::create_join_event::v2::Request { room_id, @@ -621,9 +631,12 @@ async fn join_room_by_id_helper( .expect("Found event_id in sorted events that is not in resolved state"); // We do not rebuild the PDU in this case only insert to DB - let pdu_id = - db.rooms - .append_pdu(&PduEvent::from(&**pdu), &db.globals, &db.account_data)?; + let pdu_id = db.rooms.append_pdu( + &PduEvent::from(&**pdu), + &serde_json::to_value(&**pdu).expect("PDU is valid value"), + &db.globals, + &db.account_data, + )?; if state_events.contains(ev_id) { state.insert( @@ -646,19 +659,22 @@ async fn join_room_by_id_helper( third_party_invite: None, }; - db.rooms.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(event).expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(sender_id.to_string()), - redacts: None, - }, - &sender_id, - &room_id, - &db.globals, - &db.account_data, - )?; + db.rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMember, + content: serde_json::to_value(event) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: Some(sender_id.to_string()), + redacts: None, + }, + &sender_id, + &room_id, + &db.globals, + &db.account_data, + ) + .await?; } Ok(join_room_by_id::Response::new(room_id.clone()).into()) diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 8a09aba..4ba0d9f 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -17,7 +17,7 @@ use rocket::{get, put}; feature = "conduit_bin", put("/_matrix/client/r0/rooms/<_>/send/<_>/<_>", data = "") )] -pub fn send_message_event_route( +pub async fn send_message_event_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -67,7 +67,7 @@ pub fn send_message_event_route( &body.room_id, &db.globals, &db.account_data, - )?; + ).await?; db.transaction_ids .add_txnid(sender_id, device_id, &body.txn_id, event_id.as_bytes())?; diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index c1c0253..be893e1 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -19,7 +19,7 @@ use std::convert::TryInto; feature = "conduit_bin", put("/_matrix/client/r0/profile/<_>/displayname", data = "") )] -pub fn set_displayname_route( +pub async fn set_displayname_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -64,7 +64,7 @@ pub fn set_displayname_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // Presence update db.rooms.edus.update_presence( @@ -110,7 +110,7 @@ pub fn get_displayname_route( feature = "conduit_bin", put("/_matrix/client/r0/profile/<_>/avatar_url", data = "") )] -pub fn set_avatar_url_route( +pub async fn set_avatar_url_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -167,7 +167,7 @@ pub fn set_avatar_url_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // Presence update db.rooms.edus.update_presence( diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index 5117348..701fc00 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -12,7 +12,7 @@ use rocket::put; feature = "conduit_bin", put("/_matrix/client/r0/rooms/<_>/redact/<_>/<_>", data = "") )] -pub fn redact_event_route( +pub async fn redact_event_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -33,7 +33,7 @@ pub fn redact_event_route( &body.room_id, &db.globals, &db.account_data, - )?; + ).await?; Ok(redact_event::Response { event_id }.into()) } diff --git a/src/client_server/room.rs b/src/client_server/room.rs index a5280cf..0e5c571 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -20,7 +20,7 @@ use rocket::{get, post}; feature = "conduit_bin", post("/_matrix/client/r0/createRoom", data = "") )] -pub fn create_room_route( +pub async fn create_room_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -65,7 +65,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // 2. Let the room creator join db.rooms.build_and_append_pdu( @@ -87,7 +87,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // 3. Power levels let mut users = BTreeMap::new(); @@ -129,7 +129,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // 4. Events set by preset @@ -162,7 +162,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // 4.2 History Visibility db.rooms.build_and_append_pdu( @@ -180,7 +180,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // 4.3 Guest Access db.rooms.build_and_append_pdu( @@ -206,7 +206,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; // 5. Events listed in initial_state for event in &body.initial_state { @@ -226,7 +226,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; } // 6. Events implied by name and topic @@ -248,7 +248,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; } if let Some(topic) = &body.topic { @@ -267,7 +267,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; } // 7. Events implied by invite (and TODO: invite_3pid) @@ -291,7 +291,7 @@ pub fn create_room_route( &room_id, &db.globals, &db.account_data, - )?; + ).await?; } // Homeserver specific stuff @@ -337,7 +337,7 @@ pub fn get_room_event_route( feature = "conduit_bin", post("/_matrix/client/r0/rooms/<_room_id>/upgrade", data = "") )] -pub fn upgrade_room_route( +pub async fn upgrade_room_route( db: State<'_, Database>, body: Ruma>, _room_id: String, @@ -379,7 +379,7 @@ pub fn upgrade_room_route( &body.room_id, &db.globals, &db.account_data, - )?; + ).await?; // Get the old room federations status let federate = serde_json::from_value::>( @@ -419,7 +419,7 @@ pub fn upgrade_room_route( &replacement_room, &db.globals, &db.account_data, - )?; + ).await?; // Join the new room db.rooms.build_and_append_pdu( @@ -441,7 +441,7 @@ pub fn upgrade_room_route( &replacement_room, &db.globals, &db.account_data, - )?; + ).await?; // Recommended transferable state events list from the specs let transferable_state_events = vec![ @@ -475,7 +475,7 @@ pub fn upgrade_room_route( &replacement_room, &db.globals, &db.account_data, - )?; + ).await?; } // Moves any local aliases to the new room @@ -505,7 +505,7 @@ pub fn upgrade_room_route( power_levels_event_content.invite = new_level; // Modify the power levels in the old room to prevent sending of events and inviting new users - db.rooms + let _ = db.rooms .build_and_append_pdu( PduBuilder { event_type: EventType::RoomPowerLevels, @@ -519,8 +519,7 @@ pub fn upgrade_room_route( &body.room_id, &db.globals, &db.account_data, - ) - .ok(); + ).await; // Return the replacement room id Ok(upgrade_room::Response { replacement_room }.into()) diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 1fe3cd6..e9d20e2 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -19,7 +19,7 @@ use rocket::{get, put}; feature = "conduit_bin", put("/_matrix/client/r0/rooms/<_>/state/<_>/<_>", data = "") )] -pub fn send_state_event_for_key_route( +pub async fn send_state_event_for_key_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -41,7 +41,7 @@ pub fn send_state_event_for_key_route( content, &body.room_id, Some(body.state_key.to_owned()), - )?) + ).await?) .into(), ) } @@ -50,7 +50,7 @@ pub fn send_state_event_for_key_route( feature = "conduit_bin", put("/_matrix/client/r0/rooms/<_>/state/<_>", data = "") )] -pub fn send_state_event_for_empty_key_route( +pub async fn send_state_event_for_empty_key_route( db: State<'_, Database>, body: Ruma>, ) -> ConduitResult { @@ -80,7 +80,7 @@ pub fn send_state_event_for_empty_key_route( json, &body.room_id, Some("".into()), - )?) + ).await?) .into(), ) } @@ -177,7 +177,7 @@ pub fn get_state_events_for_empty_key_route( .into()) } -pub fn send_state_event_for_key_helper( +pub async fn send_state_event_for_key_helper( db: &Database, sender: &UserId, content: &AnyStateEventContent, @@ -223,7 +223,7 @@ pub fn send_state_event_for_key_helper( &room_id, &db.globals, &db.account_data, - )?; + ).await?; Ok(event_id) } diff --git a/src/database.rs b/src/database.rs index 83f30c9..e1a356c 100644 --- a/src/database.rs +++ b/src/database.rs @@ -109,6 +109,7 @@ impl Database { tokenids: db.open_tree("tokenids")?, + roomserverids: db.open_tree("roomserverids")?, userroomid_joined: db.open_tree("userroomid_joined")?, roomuserid_joined: db.open_tree("roomuserid_joined")?, roomuseroncejoinedids: db.open_tree("roomuseroncejoinedids")?, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index b538c85..ba54e7f 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -2,11 +2,12 @@ mod edus; pub use edus::RoomEdus; -use crate::{pdu::PduBuilder, utils, Error, PduEvent, Result}; +use crate::{pdu::PduBuilder, server_server, utils, Error, PduEvent, Result}; use log::error; use ring::digest; use ruma::{ api::client::error::ErrorKind, + api::federation, events::{ ignored_user_list, room::{ @@ -15,7 +16,7 @@ use ruma::{ }, EventType, }, - EventId, Raw, RoomAliasId, RoomId, UserId, + EventId, Raw, RoomAliasId, RoomId, ServerName, UserId, }; use sled::IVec; use state_res::{event_auth, Error as StateError, Requester, StateEvent, StateMap, StateStore}; @@ -25,6 +26,7 @@ use std::{ convert::{TryFrom, TryInto}, mem, sync::Arc, + time::SystemTime, }; /// The unique identifier of each state group. @@ -44,6 +46,8 @@ pub struct Rooms { pub(super) tokenids: sled::Tree, // TokenId = RoomId + Token + PduId + /// Participating servers in a room. + pub(super) roomserverids: sled::Tree, // RoomServerId = RoomId + ServerName pub(super) userroomid_joined: sled::Tree, pub(super) roomuserid_joined: sled::Tree, pub(super) roomuseroncejoinedids: sled::Tree, @@ -169,8 +173,7 @@ impl Rooms { Ok(events) } - // This fetches auth events from the current state using the - /// full `roomstateid_pdu` tree. + /// This fetches auth events from the current state. pub fn get_auth_events( &self, room_id: &RoomId, @@ -472,17 +475,10 @@ impl Rooms { pub fn append_pdu( &self, pdu: &PduEvent, + pdu_json: &serde_json::Value, globals: &super::globals::Globals, account_data: &super::account_data::AccountData, ) -> Result> { - let mut pdu_json = serde_json::to_value(&pdu).expect("event is valid, we just created it"); - ruma::signatures::hash_and_sign_event( - globals.server_name().as_str(), - globals.keypair(), - &mut pdu_json, - ) - .expect("event is valid, we just created it"); - self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; // Increment the last index and use that @@ -610,7 +606,7 @@ impl Rooms { } /// Creates a new persisted data unit and adds it to a room. - pub fn build_and_append_pdu( + pub async fn build_and_append_pdu( &self, pdu_builder: PduBuilder, sender: &UserId, @@ -799,22 +795,59 @@ impl Rooms { signatures: BTreeMap::new(), }; + // Hash and sign + let mut pdu_json = serde_json::to_value(&pdu).expect("event is valid, we just created it"); + pdu_json + .as_object_mut() + .expect("json is object") + .remove("event_id"); + + ruma::signatures::hash_and_sign_event( + globals.server_name().as_str(), + globals.keypair(), + &mut pdu_json, + ) + .expect("event is valid, we just created it"); + // Generate event id pdu.event_id = EventId::try_from(&*format!( "${}", - ruma::signatures::reference_hash( - &serde_json::to_value(&pdu).expect("event is valid, we just created it") - ) - .expect("ruma can calculate reference hashes") + ruma::signatures::reference_hash(&pdu_json) + .expect("ruma can calculate reference hashes") )) .expect("ruma's reference hashes are valid event ids"); - let pdu_id = self.append_pdu(&pdu, globals, account_data)?; + pdu_json + .as_object_mut() + .expect("json is object") + .insert("event_id".to_owned(), pdu.event_id.to_string().into()); + + let pdu_id = self.append_pdu(&pdu, &pdu_json, globals, account_data)?; if pdu.state_key.is_some() { self.append_to_state(&pdu_id, &pdu)?; } + pdu_json + .as_object_mut() + .expect("json is object") + .remove("event_id"); + + let response = server_server::send_request( + &globals, + "koesters.xyz".try_into().unwrap(), + federation::transactions::send_transaction_message::v1::Request { + origin: globals.server_name(), + pdus: &[serde_json::from_value(pdu_json).expect("Raw::from_value always works")], + edus: &[], + origin_server_ts: SystemTime::now(), + transaction_id: &utils::random_string(16), + }, + ) + .await; + + let _ = dbg!(response); + Ok(pdu.event_id) } @@ -957,12 +990,17 @@ impl Rooms { &self, room_id: &RoomId, user_id: &UserId, - mut member_content: member::MemberEventContent, + member_content: member::MemberEventContent, sender: &UserId, account_data: &super::account_data::AccountData, globals: &super::globals::Globals, ) -> Result<()> { let membership = member_content.membership; + + let mut roomserver_id = room_id.as_bytes().to_vec(); + roomserver_id.push(0xff); + roomserver_id.extend_from_slice(user_id.server_name().as_bytes()); + let mut userroom_id = user_id.as_bytes().to_vec(); userroom_id.push(0xff); userroom_id.extend_from_slice(room_id.as_bytes()); @@ -1056,6 +1094,7 @@ impl Rooms { } } + self.roomserverids.insert(&roomserver_id, &[])?; self.userroomid_joined.insert(&userroom_id, &[])?; self.roomuserid_joined.insert(&roomuser_id, &[])?; self.userroomid_invited.remove(&userroom_id)?; @@ -1075,25 +1114,10 @@ impl Rooms { }); if is_ignored { - member_content.membership = member::MembershipState::Leave; - - self.build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMember, - content: serde_json::to_value(member_content) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: Some(user_id.to_string()), - redacts: None, - }, - &user_id, - &room_id, - globals, - account_data, - )?; - return Ok(()); } + + self.roomserverids.insert(&roomserver_id, &[])?; self.userroomid_invited.insert(&userroom_id, &[])?; self.roomuserid_invited.insert(&roomuser_id, &[])?; self.userroomid_joined.remove(&userroom_id)?; @@ -1101,6 +1125,14 @@ impl Rooms { self.userroomid_left.remove(&userroom_id)?; } member::MembershipState::Leave | member::MembershipState::Ban => { + if self + .room_members(room_id) + .chain(self.room_members_invited(room_id)) + .filter_map(|r| r.ok()) + .all(|u| u.server_name() != user_id.server_name()) + { + self.roomserverids.remove(&roomserver_id)?; + } self.userroomid_left.insert(&userroom_id, &[])?; self.userroomid_joined.remove(&userroom_id)?; self.roomuserid_joined.remove(&roomuser_id)?; @@ -1294,10 +1326,34 @@ impl Rooms { }) } + /// Returns an iterator over all joined members of a room. + pub fn room_servers(&self, room_id: &RoomId) -> impl Iterator>> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + + self.roomserverids.scan_prefix(prefix).keys().map(|key| { + Ok(Box::::try_from( + utils::string_from_bytes( + &key? + .rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| { + Error::bad_database("Server name in roomserverids is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Server name in roomserverids is invalid."))?) + }) + } + /// Returns an iterator over all joined members of a room. pub fn room_members(&self, room_id: &RoomId) -> impl Iterator> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + self.roomuserid_joined - .scan_prefix(room_id.as_bytes()) + .scan_prefix(prefix) .keys() .map(|key| { Ok(UserId::try_from( @@ -1317,8 +1373,11 @@ impl Rooms { /// Returns an iterator over all User IDs who ever joined a room. pub fn room_useroncejoined(&self, room_id: &RoomId) -> impl Iterator> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + self.roomuseroncejoinedids - .scan_prefix(room_id.to_string()) + .scan_prefix(prefix) .keys() .map(|key| { Ok(UserId::try_from( @@ -1338,8 +1397,11 @@ impl Rooms { /// Returns an iterator over all invited members of a room. pub fn room_members_invited(&self, room_id: &RoomId) -> impl Iterator> { + let mut prefix = room_id.as_bytes().to_vec(); + prefix.push(0xff); + self.roomuserid_invited - .scan_prefix(room_id.as_bytes()) + .scan_prefix(prefix) .keys() .map(|key| { Ok(UserId::try_from( @@ -1380,8 +1442,11 @@ impl Rooms { /// Returns an iterator over all rooms a user was invited to. pub fn rooms_invited(&self, user_id: &UserId) -> impl Iterator> { + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + self.userroomid_invited - .scan_prefix(&user_id.as_bytes()) + .scan_prefix(prefix) .keys() .map(|key| { Ok(RoomId::try_from( @@ -1401,23 +1466,23 @@ impl Rooms { /// Returns an iterator over all rooms a user left. pub fn rooms_left(&self, user_id: &UserId) -> impl Iterator> { - self.userroomid_left - .scan_prefix(&user_id.as_bytes()) - .keys() - .map(|key| { - Ok(RoomId::try_from( - utils::string_from_bytes( - &key? - .rsplit(|&b| b == 0xff) - .next() - .expect("rsplit always returns an element"), - ) - .map_err(|_| { - Error::bad_database("Room ID in userroomid_left is invalid unicode.") - })?, + let mut prefix = user_id.as_bytes().to_vec(); + prefix.push(0xff); + + self.userroomid_left.scan_prefix(prefix).keys().map(|key| { + Ok(RoomId::try_from( + utils::string_from_bytes( + &key? + .rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), ) - .map_err(|_| Error::bad_database("Room ID in userroomid_left is invalid."))?) - }) + .map_err(|_| { + Error::bad_database("Room ID in userroomid_left is invalid unicode.") + })?, + ) + .map_err(|_| Error::bad_database("Room ID in userroomid_left is invalid."))?) + }) } pub fn once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result { diff --git a/src/pdu.rs b/src/pdu.rs index 7f842e2..c904230 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -6,7 +6,7 @@ use ruma::{ AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, StateEvent, }, EventId, Raw, RoomId, ServerKeyId, ServerName, UserId, -}; +events::pdu::PduStub}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::{collections::BTreeMap, convert::TryInto, sync::Arc, time::UNIX_EPOCH}; @@ -198,6 +198,31 @@ impl PduEvent { serde_json::from_value(json).expect("Raw::from_value always works") } + + pub fn to_outgoing_federation_event(&self) -> Raw { + let mut json = json!({ + "room_id": self.room_id, + "sender": self.sender, + "origin_server_ts": self.origin_server_ts, + "type": self.kind, + "content": self.content, + "prev_events": self.prev_events, + "depth": self.depth, + "auth_events": self.auth_events, + "unsigned": self.unsigned, + "hashes": self.hashes, + "signatures": self.signatures, + }); + + if let Some(state_key) = &self.state_key { + json["state_key"] = json!(state_key); + } + if let Some(redacts) = &self.redacts { + json["redacts"] = json!(redacts); + } + + serde_json::from_value(json).expect("Raw::from_value always works") + } } impl From<&state_res::StateEvent> for PduEvent { diff --git a/src/server_server.rs b/src/server_server.rs index 6c53aed..9f4be13 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -24,9 +24,9 @@ use std::{ time::{Duration, SystemTime}, }; -pub async fn request_well_known(db: &crate::Database, destination: &str) -> Option { +pub async fn request_well_known(globals: &crate::database::globals::Globals, destination: &str) -> Option { let body: serde_json::Value = serde_json::from_str( - &db.globals + &globals .reqwest_client() .get(&format!( "https://{}/.well-known/matrix/server", @@ -44,7 +44,7 @@ pub async fn request_well_known(db: &crate::Database, destination: &str) -> Opti } pub async fn send_request( - db: &crate::Database, + globals: &crate::database::globals::Globals, destination: &ServerName, request: T, ) -> Result @@ -52,7 +52,7 @@ where T: Debug, { let actual_destination = "https://".to_owned() - + &request_well_known(db, &destination.as_str()) + + &request_well_known(globals, &destination.as_str()) .await .unwrap_or(destination.as_str().to_owned() + ":8448"); @@ -81,14 +81,14 @@ where ); request_map.insert( "origin".to_owned(), - db.globals.server_name().as_str().into(), + globals.server_name().as_str().into(), ); request_map.insert("destination".to_owned(), destination.as_str().into()); let mut request_json = request_map.into(); ruma::signatures::sign_json( - db.globals.server_name().as_str(), - db.globals.keypair(), + globals.server_name().as_str(), + globals.keypair(), &mut request_json, ) .unwrap(); @@ -110,7 +110,7 @@ where AUTHORIZATION, HeaderValue::from_str(&format!( "X-Matrix origin={},key=\"{}\",sig=\"{}\"", - db.globals.server_name(), + globals.server_name(), s.0, s.1 )) @@ -122,7 +122,7 @@ where let reqwest_request = reqwest::Request::try_from(http_request) .expect("all http requests are valid reqwest requests"); - let reqwest_response = db.globals.reqwest_client().execute(reqwest_request).await; + let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; // Because reqwest::Response -> http::Response is complicated: match reqwest_response { @@ -317,9 +317,9 @@ pub fn send_transaction_message_route<'a>( .insert("event_id".to_owned(), event_id.to_string().into()); let pdu = - serde_json::from_value::(value).expect("all ruma pdus are conduit pdus"); + serde_json::from_value::(value.clone()).expect("all ruma pdus are conduit pdus"); if db.rooms.exists(&pdu.room_id)? { - db.rooms.append_pdu(&pdu, &db.globals, &db.account_data)?; + db.rooms.append_pdu(&pdu, &value, &db.globals, &db.account_data)?; } } Ok(send_transaction_message::v1::Response {