diff --git a/Cargo.lock b/Cargo.lock index 03b1731..8037aa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2062,8 +2062,8 @@ dependencies = [ [[package]] name = "ruma" -version = "0.3.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +version = "0.4.0" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "assign", "js_int", @@ -2084,7 +2084,7 @@ dependencies = [ [[package]] name = "ruma-api" version = "0.18.3" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "bytes", "http", @@ -2100,7 +2100,7 @@ dependencies = [ [[package]] name = "ruma-api-macros" version = "0.18.3" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2111,7 +2111,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.4.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "ruma-api", "ruma-common", @@ -2125,7 +2125,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.12.2" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "assign", "bytes", @@ -2145,7 +2145,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.6.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "indexmap", "js_int", @@ -2159,8 +2159,8 @@ dependencies = [ [[package]] name = "ruma-events" -version = "0.24.4" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +version = "0.24.5" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "indoc", "js_int", @@ -2175,8 +2175,8 @@ dependencies = [ [[package]] name = "ruma-events-macros" -version = "0.24.4" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +version = "0.24.5" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2186,8 +2186,8 @@ dependencies = [ [[package]] name = "ruma-federation-api" -version = "0.3.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +version = "0.3.1" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "js_int", "ruma-api", @@ -2202,7 +2202,7 @@ dependencies = [ [[package]] name = "ruma-identifiers" version = "0.20.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "paste", "rand 0.8.4", @@ -2216,7 +2216,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" version = "0.20.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "quote", "ruma-identifiers-validation", @@ -2226,7 +2226,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.5.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "thiserror", ] @@ -2234,7 +2234,7 @@ dependencies = [ [[package]] name = "ruma-identity-service-api" version = "0.3.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "js_int", "ruma-api", @@ -2247,7 +2247,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.3.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "js_int", "ruma-api", @@ -2262,7 +2262,7 @@ dependencies = [ [[package]] name = "ruma-serde" version = "0.5.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "bytes", "form_urlencoded", @@ -2276,7 +2276,7 @@ dependencies = [ [[package]] name = "ruma-serde-macros" version = "0.5.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -2287,7 +2287,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.9.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "base64 0.13.0", "ed25519-dalek", @@ -2303,12 +2303,11 @@ dependencies = [ [[package]] name = "ruma-state-res" -version = "0.3.0" -source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c" +version = "0.4.0" +source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71" dependencies = [ "itertools 0.10.1", "js_int", - "maplit", "ruma-common", "ruma-events", "ruma-identifiers", diff --git a/Cargo.toml b/Cargo.toml index bb44d08..ca802e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ rocket = { version = "0.5.0-rc.1", features = ["tls"] } # Used to handle request # Used for matrix spec type definitions and helpers #ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { git = "https://github.com/ruma/ruma", rev = "f5ab038e22421ed338396ece977b6b2844772ced", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } -ruma = { git = "https://github.com/DevinR528/ruma", rev = "c7860fcb89dbde636e2c83d0636934fb9924f40c", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } +ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } #ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } # Used for long polling and federation sender, should be the same as rocket::tokio diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index c88e0a8..52e074c 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -5,7 +5,6 @@ use crate::{ server_server, utils, ConduitResult, Database, Error, Result, Ruma, }; use member::{MemberEventContent, MembershipState}; -use rocket::futures; use ruma::{ api::{ client::{ @@ -667,14 +666,19 @@ async fn join_room_by_id_helper( let mut state = HashMap::new(); let pub_key_map = RwLock::new(BTreeMap::new()); - for result in futures::future::join_all( - send_join_response - .room_state - .state - .iter() - .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db)), + server_server::fetch_join_signing_keys( + &send_join_response, + &room_version, + &pub_key_map, + &db, ) - .await + .await?; + + for result in send_join_response + .room_state + .state + .iter() + .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db)) { let (event_id, value) = match result { Ok(t) => t, @@ -723,14 +727,11 @@ async fn join_room_by_id_helper( &db, )?; - for result in futures::future::join_all( - send_join_response - .room_state - .auth_chain - .iter() - .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db)), - ) - .await + for result in send_join_response + .room_state + .auth_chain + .iter() + .map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db)) { let (event_id, value) = match result { Ok(t) => t, @@ -787,7 +788,7 @@ async fn join_room_by_id_helper( Ok(join_room_by_id::Response::new(room_id.clone()).into()) } -async fn validate_and_add_event_id( +fn validate_and_add_event_id( pdu: &Raw, room_version: &RoomVersionId, pub_key_map: &RwLock>>, @@ -830,7 +831,6 @@ async fn validate_and_add_event_id( } } - server_server::fetch_required_signing_keys(&value, pub_key_map, db).await?; if let Err(e) = ruma::signatures::verify_event( &*pub_key_map .read() diff --git a/src/database/globals.rs b/src/database/globals.rs index 6d11f49..048b9b8 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -227,7 +227,11 @@ impl Globals { /// Remove the outdated keys and insert the new ones. /// /// This doesn't actually check that the keys provided are newer than the old set. - pub fn add_signing_key(&self, origin: &ServerName, new_keys: ServerSigningKeys) -> Result<()> { + pub fn add_signing_key( + &self, + origin: &ServerName, + new_keys: ServerSigningKeys, + ) -> Result> { // Not atomic, but this is not critical let signingkeys = self.server_signingkeys.get(origin.as_bytes())?; @@ -252,7 +256,14 @@ impl Globals { &serde_json::to_vec(&keys).expect("serversigningkeys can be serialized"), )?; - Ok(()) + let mut tree = keys.verify_keys; + tree.extend( + keys.old_verify_keys + .into_iter() + .map(|old| (old.0, VerifyKey::new(old.1.key))), + ); + + Ok(tree) } /// This returns an empty `Ok(BTreeMap<..>)` when there are no keys found for the server. diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index ff28436..14146fb 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -422,7 +422,7 @@ impl RoomEdus { } /// Sets all users to offline who have been quiet for too long. - fn presence_maintain( + fn _presence_maintain( &self, rooms: &super::Rooms, globals: &super::super::globals::Globals, @@ -489,13 +489,13 @@ impl RoomEdus { } /// Returns an iterator over the most recent presence updates that happened after the event with id `since`. - #[tracing::instrument(skip(self, globals, rooms))] + #[tracing::instrument(skip(self, since, _rooms, _globals))] pub fn presence_since( &self, room_id: &RoomId, since: u64, - rooms: &super::Rooms, - globals: &super::super::globals::Globals, + _rooms: &super::Rooms, + _globals: &super::super::globals::Globals, ) -> Result> { //self.presence_maintain(rooms, globals)?; diff --git a/src/server_server.rs b/src/server_server.rs index b81610e..60998a7 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -6,7 +6,10 @@ use crate::{ use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION}; use regex::Regex; -use rocket::response::content::Json; +use rocket::{ + futures::{prelude::*, stream::FuturesUnordered}, + response::content::Json, +}; use ruma::{ api::{ client::error::{Error as RumaError, ErrorKind}, @@ -15,8 +18,9 @@ use ruma::{ device::get_devices::{self, v1::UserDevice}, directory::{get_public_rooms, get_public_rooms_filtered}, discovery::{ - get_remote_server_keys, get_server_keys, get_server_version, ServerSigningKeys, - VerifyKey, + get_remote_server_keys, get_remote_server_keys_batch, + get_remote_server_keys_batch::v2::QueryCriteria, get_server_keys, + get_server_version, ServerSigningKeys, VerifyKey, }, event::{get_event, get_missing_events, get_room_state, get_room_state_ids}, keys::{claim_keys, get_keys}, @@ -35,6 +39,7 @@ use ruma::{ }, directory::{IncomingFilter, IncomingRoomNetwork}, events::{ + pdu::Pdu, receipt::{ReceiptEvent, ReceiptEventContent}, room::{ create::CreateEventContent, @@ -59,7 +64,7 @@ use std::{ net::{IpAddr, SocketAddr}, pin::Pin, result::Result as StdResult, - sync::{Arc, RwLock}, + sync::{Arc, RwLock, RwLockWriteGuard}, time::{Duration, Instant, SystemTime}, }; use tokio::sync::{MutexGuard, Semaphore}; @@ -566,7 +571,7 @@ pub fn get_server_keys_route(db: DatabaseGuard) -> Json { old_verify_keys: BTreeMap::new(), signatures: BTreeMap::new(), valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time( - SystemTime::now() + Duration::from_secs(60 * 2), + SystemTime::now() + Duration::from_secs(86400 * 7), ) .expect("time is valid"), }, @@ -3277,6 +3282,204 @@ pub(crate) async fn fetch_required_signing_keys( Ok(()) } +// Gets a list of servers for which we don't have the signing key yet. We go over +// the PDUs and either cache the key or add it to the list that needs to be retrieved. +fn get_server_keys_from_cache( + pdu: &Raw, + servers: &mut BTreeMap, BTreeMap>, + room_version: &RoomVersionId, + pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap>>, + db: &Database, +) -> Result<()> { + let value = serde_json::from_str::(pdu.json().get()).map_err(|e| { + error!("Invalid PDU in server response: {:?}: {:?}", pdu, e); + Error::BadServerResponse("Invalid PDU in server response") + })?; + + let event_id = EventId::try_from(&*format!( + "${}", + ruma::signatures::reference_hash(&value, &room_version) + .expect("ruma can calculate reference hashes") + )) + .expect("ruma's reference hashes are valid event ids"); + + if let Some((time, tries)) = db + .globals + .bad_event_ratelimiter + .read() + .unwrap() + .get(&event_id) + { + // Exponential backoff + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + debug!("Backing off from {}", event_id); + return Err(Error::BadServerResponse("bad event, still backing off")); + } + } + + let signatures = value + .get("signatures") + .ok_or(Error::BadServerResponse( + "No signatures in server response pdu.", + ))? + .as_object() + .ok_or(Error::BadServerResponse( + "Invalid signatures object in server response pdu.", + ))?; + + for (signature_server, signature) in signatures { + let signature_object = signature.as_object().ok_or(Error::BadServerResponse( + "Invalid signatures content object in server response pdu.", + ))?; + + let signature_ids = signature_object.keys().cloned().collect::>(); + + let contains_all_ids = + |keys: &BTreeMap| signature_ids.iter().all(|id| keys.contains_key(id)); + + let origin = &Box::::try_from(&**signature_server).map_err(|_| { + Error::BadServerResponse("Invalid servername in signatures of server response pdu.") + })?; + + if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) { + continue; + } + + trace!("Loading signing keys for {}", origin); + + let result = db + .globals + .signing_keys_for(origin)? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect::>(); + + if !contains_all_ids(&result) { + trace!("Signing key not loaded for {}", origin); + servers.insert( + origin.clone(), + BTreeMap::::new(), + ); + } + + pub_key_map.insert(origin.to_string(), result); + } + + Ok(()) +} + +pub(crate) async fn fetch_join_signing_keys( + event: &create_join_event::v2::Response, + room_version: &RoomVersionId, + pub_key_map: &RwLock>>, + db: &Database, +) -> Result<()> { + let mut servers = + BTreeMap::, BTreeMap>::new(); + + { + let mut pkm = pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))?; + + // Try to fetch keys, failure is okay + // Servers we couldn't find in the cache will be added to `servers` + for pdu in &event.room_state.state { + let _ = get_server_keys_from_cache(pdu, &mut servers, &room_version, &mut pkm, &db); + } + for pdu in &event.room_state.auth_chain { + let _ = get_server_keys_from_cache(pdu, &mut servers, &room_version, &mut pkm, &db); + } + + drop(pkm); + } + + if servers.is_empty() { + // We had all keys locally + return Ok(()); + } + + for server in db.globals.trusted_servers() { + trace!("Asking batch signing keys from trusted server {}", server); + if let Ok(keys) = db + .sending + .send_federation_request( + &db.globals, + server, + get_remote_server_keys_batch::v2::Request { + server_keys: servers.clone(), + minimum_valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time( + SystemTime::now() + Duration::from_secs(60), + ) + .expect("time is valid"), + }, + ) + .await + { + trace!("Got signing keys: {:?}", keys); + let mut pkm = pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))?; + for k in keys.server_keys { + // TODO: Check signature from trusted server? + servers.remove(&k.server_name); + + let result = db + .globals + .add_signing_key(&k.server_name, k.clone())? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect::>(); + + pkm.insert(k.server_name.to_string(), result); + } + } + + if servers.is_empty() { + return Ok(()); + } + } + + let mut futures = servers + .into_iter() + .map(|(server, _)| async move { + ( + db.sending + .send_federation_request( + &db.globals, + &server, + get_server_keys::v2::Request::new(), + ) + .await, + server, + ) + }) + .collect::>(); + + while let Some(result) = futures.next().await { + if let (Ok(get_keys_response), origin) = result { + let result = db + .globals + .add_signing_key(&origin, get_keys_response.server_key.clone())? + .into_iter() + .map(|(k, v)| (k.to_string(), v.key)) + .collect::>(); + + pub_key_map + .write() + .map_err(|_| Error::bad_database("RwLock is poisoned."))? + .insert(origin.to_string(), result); + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::{add_port_to_hostname, get_ip_with_port, FedDest};