Merge branch 'batch_keys' into 'master'

Get required keys in batch when joining a room

See merge request famedly/conduit!182
This commit is contained in:
Timo Kösters 2021-09-01 19:12:21 +00:00
commit 27788af022
6 changed files with 267 additions and 54 deletions

47
Cargo.lock generated
View file

@ -2062,8 +2062,8 @@ dependencies = [
[[package]]
name = "ruma"
version = "0.3.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
version = "0.4.0"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"assign",
"js_int",
@ -2084,7 +2084,7 @@ dependencies = [
[[package]]
name = "ruma-api"
version = "0.18.3"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"bytes",
"http",
@ -2100,7 +2100,7 @@ dependencies = [
[[package]]
name = "ruma-api-macros"
version = "0.18.3"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -2111,7 +2111,7 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.4.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"ruma-api",
"ruma-common",
@ -2125,7 +2125,7 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.12.2"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"assign",
"bytes",
@ -2145,7 +2145,7 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.6.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"indexmap",
"js_int",
@ -2159,8 +2159,8 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.24.4"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
version = "0.24.5"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"indoc",
"js_int",
@ -2175,8 +2175,8 @@ dependencies = [
[[package]]
name = "ruma-events-macros"
version = "0.24.4"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
version = "0.24.5"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -2186,8 +2186,8 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.3.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
version = "0.3.1"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"js_int",
"ruma-api",
@ -2202,7 +2202,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers"
version = "0.20.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"paste",
"rand 0.8.4",
@ -2216,7 +2216,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-macros"
version = "0.20.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"quote",
"ruma-identifiers-validation",
@ -2226,7 +2226,7 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.5.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"thiserror",
]
@ -2234,7 +2234,7 @@ dependencies = [
[[package]]
name = "ruma-identity-service-api"
version = "0.3.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"js_int",
"ruma-api",
@ -2247,7 +2247,7 @@ dependencies = [
[[package]]
name = "ruma-push-gateway-api"
version = "0.3.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"js_int",
"ruma-api",
@ -2262,7 +2262,7 @@ dependencies = [
[[package]]
name = "ruma-serde"
version = "0.5.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"bytes",
"form_urlencoded",
@ -2276,7 +2276,7 @@ dependencies = [
[[package]]
name = "ruma-serde-macros"
version = "0.5.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -2287,7 +2287,7 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.9.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"base64 0.13.0",
"ed25519-dalek",
@ -2303,12 +2303,11 @@ dependencies = [
[[package]]
name = "ruma-state-res"
version = "0.3.0"
source = "git+https://github.com/DevinR528/ruma?rev=c7860fcb89dbde636e2c83d0636934fb9924f40c#c7860fcb89dbde636e2c83d0636934fb9924f40c"
version = "0.4.0"
source = "git+https://github.com/timokoesters/ruma?rev=50c1db7e0a3a21fc794b0cce3b64285a4c750c71#50c1db7e0a3a21fc794b0cce3b64285a4c750c71"
dependencies = [
"itertools 0.10.1",
"js_int",
"maplit",
"ruma-common",
"ruma-events",
"ruma-identifiers",

View file

@ -20,7 +20,7 @@ rocket = { version = "0.5.0-rc.1", features = ["tls"] } # Used to handle request
# Used for matrix spec type definitions and helpers
#ruma = { version = "0.4.0", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
#ruma = { git = "https://github.com/ruma/ruma", rev = "f5ab038e22421ed338396ece977b6b2844772ced", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
ruma = { git = "https://github.com/DevinR528/ruma", rev = "c7860fcb89dbde636e2c83d0636934fb9924f40c", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
ruma = { git = "https://github.com/timokoesters/ruma", rev = "50c1db7e0a3a21fc794b0cce3b64285a4c750c71", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
#ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] }
# Used for long polling and federation sender, should be the same as rocket::tokio

View file

@ -5,7 +5,6 @@ use crate::{
server_server, utils, ConduitResult, Database, Error, Result, Ruma,
};
use member::{MemberEventContent, MembershipState};
use rocket::futures;
use ruma::{
api::{
client::{
@ -667,14 +666,19 @@ async fn join_room_by_id_helper(
let mut state = HashMap::new();
let pub_key_map = RwLock::new(BTreeMap::new());
for result in futures::future::join_all(
send_join_response
.room_state
.state
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db)),
server_server::fetch_join_signing_keys(
&send_join_response,
&room_version,
&pub_key_map,
&db,
)
.await
.await?;
for result in send_join_response
.room_state
.state
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db))
{
let (event_id, value) = match result {
Ok(t) => t,
@ -723,14 +727,11 @@ async fn join_room_by_id_helper(
&db,
)?;
for result in futures::future::join_all(
send_join_response
.room_state
.auth_chain
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db)),
)
.await
for result in send_join_response
.room_state
.auth_chain
.iter()
.map(|pdu| validate_and_add_event_id(pdu, &room_version, &pub_key_map, &db))
{
let (event_id, value) = match result {
Ok(t) => t,
@ -787,7 +788,7 @@ async fn join_room_by_id_helper(
Ok(join_room_by_id::Response::new(room_id.clone()).into())
}
async fn validate_and_add_event_id(
fn validate_and_add_event_id(
pdu: &Raw<Pdu>,
room_version: &RoomVersionId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
@ -830,7 +831,6 @@ async fn validate_and_add_event_id(
}
}
server_server::fetch_required_signing_keys(&value, pub_key_map, db).await?;
if let Err(e) = ruma::signatures::verify_event(
&*pub_key_map
.read()

View file

@ -227,7 +227,11 @@ impl Globals {
/// Remove the outdated keys and insert the new ones.
///
/// This doesn't actually check that the keys provided are newer than the old set.
pub fn add_signing_key(&self, origin: &ServerName, new_keys: ServerSigningKeys) -> Result<()> {
pub fn add_signing_key(
&self,
origin: &ServerName,
new_keys: ServerSigningKeys,
) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> {
// Not atomic, but this is not critical
let signingkeys = self.server_signingkeys.get(origin.as_bytes())?;
@ -252,7 +256,14 @@ impl Globals {
&serde_json::to_vec(&keys).expect("serversigningkeys can be serialized"),
)?;
Ok(())
let mut tree = keys.verify_keys;
tree.extend(
keys.old_verify_keys
.into_iter()
.map(|old| (old.0, VerifyKey::new(old.1.key))),
);
Ok(tree)
}
/// This returns an empty `Ok(BTreeMap<..>)` when there are no keys found for the server.

View file

@ -422,7 +422,7 @@ impl RoomEdus {
}
/// Sets all users to offline who have been quiet for too long.
fn presence_maintain(
fn _presence_maintain(
&self,
rooms: &super::Rooms,
globals: &super::super::globals::Globals,
@ -489,13 +489,13 @@ impl RoomEdus {
}
/// Returns an iterator over the most recent presence updates that happened after the event with id `since`.
#[tracing::instrument(skip(self, globals, rooms))]
#[tracing::instrument(skip(self, since, _rooms, _globals))]
pub fn presence_since(
&self,
room_id: &RoomId,
since: u64,
rooms: &super::Rooms,
globals: &super::super::globals::Globals,
_rooms: &super::Rooms,
_globals: &super::super::globals::Globals,
) -> Result<HashMap<UserId, PresenceEvent>> {
//self.presence_maintain(rooms, globals)?;

View file

@ -6,7 +6,10 @@ use crate::{
use get_profile_information::v1::ProfileField;
use http::header::{HeaderValue, AUTHORIZATION};
use regex::Regex;
use rocket::response::content::Json;
use rocket::{
futures::{prelude::*, stream::FuturesUnordered},
response::content::Json,
};
use ruma::{
api::{
client::error::{Error as RumaError, ErrorKind},
@ -15,8 +18,9 @@ use ruma::{
device::get_devices::{self, v1::UserDevice},
directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{
get_remote_server_keys, get_server_keys, get_server_version, ServerSigningKeys,
VerifyKey,
get_remote_server_keys, get_remote_server_keys_batch,
get_remote_server_keys_batch::v2::QueryCriteria, get_server_keys,
get_server_version, ServerSigningKeys, VerifyKey,
},
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
keys::{claim_keys, get_keys},
@ -35,6 +39,7 @@ use ruma::{
},
directory::{IncomingFilter, IncomingRoomNetwork},
events::{
pdu::Pdu,
receipt::{ReceiptEvent, ReceiptEventContent},
room::{
create::CreateEventContent,
@ -59,7 +64,7 @@ use std::{
net::{IpAddr, SocketAddr},
pin::Pin,
result::Result as StdResult,
sync::{Arc, RwLock},
sync::{Arc, RwLock, RwLockWriteGuard},
time::{Duration, Instant, SystemTime},
};
use tokio::sync::{MutexGuard, Semaphore};
@ -566,7 +571,7 @@ pub fn get_server_keys_route(db: DatabaseGuard) -> Json<String> {
old_verify_keys: BTreeMap::new(),
signatures: BTreeMap::new(),
valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(60 * 2),
SystemTime::now() + Duration::from_secs(86400 * 7),
)
.expect("time is valid"),
},
@ -3277,6 +3282,204 @@ pub(crate) async fn fetch_required_signing_keys(
Ok(())
}
// Gets a list of servers for which we don't have the signing key yet. We go over
// the PDUs and either cache the key or add it to the list that needs to be retrieved.
fn get_server_keys_from_cache(
pdu: &Raw<Pdu>,
servers: &mut BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>,
room_version: &RoomVersionId,
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>,
db: &Database,
) -> Result<()> {
let value = serde_json::from_str::<CanonicalJsonObject>(pdu.json().get()).map_err(|e| {
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let event_id = EventId::try_from(&*format!(
"${}",
ruma::signatures::reference_hash(&value, &room_version)
.expect("ruma can calculate reference hashes")
))
.expect("ruma's reference hashes are valid event ids");
if let Some((time, tries)) = db
.globals
.bad_event_ratelimiter
.read()
.unwrap()
.get(&event_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", event_id);
return Err(Error::BadServerResponse("bad event, still backing off"));
}
}
let signatures = value
.get("signatures")
.ok_or(Error::BadServerResponse(
"No signatures in server response pdu.",
))?
.as_object()
.ok_or(Error::BadServerResponse(
"Invalid signatures object in server response pdu.",
))?;
for (signature_server, signature) in signatures {
let signature_object = signature.as_object().ok_or(Error::BadServerResponse(
"Invalid signatures content object in server response pdu.",
))?;
let signature_ids = signature_object.keys().cloned().collect::<Vec<_>>();
let contains_all_ids =
|keys: &BTreeMap<String, String>| signature_ids.iter().all(|id| keys.contains_key(id));
let origin = &Box::<ServerName>::try_from(&**signature_server).map_err(|_| {
Error::BadServerResponse("Invalid servername in signatures of server response pdu.")
})?;
if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) {
continue;
}
trace!("Loading signing keys for {}", origin);
let result = db
.globals
.signing_keys_for(origin)?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect::<BTreeMap<_, _>>();
if !contains_all_ids(&result) {
trace!("Signing key not loaded for {}", origin);
servers.insert(
origin.clone(),
BTreeMap::<ServerSigningKeyId, QueryCriteria>::new(),
);
}
pub_key_map.insert(origin.to_string(), result);
}
Ok(())
}
pub(crate) async fn fetch_join_signing_keys(
event: &create_join_event::v2::Response,
room_version: &RoomVersionId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
db: &Database,
) -> Result<()> {
let mut servers =
BTreeMap::<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>::new();
{
let mut pkm = pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?;
// Try to fetch keys, failure is okay
// Servers we couldn't find in the cache will be added to `servers`
for pdu in &event.room_state.state {
let _ = get_server_keys_from_cache(pdu, &mut servers, &room_version, &mut pkm, &db);
}
for pdu in &event.room_state.auth_chain {
let _ = get_server_keys_from_cache(pdu, &mut servers, &room_version, &mut pkm, &db);
}
drop(pkm);
}
if servers.is_empty() {
// We had all keys locally
return Ok(());
}
for server in db.globals.trusted_servers() {
trace!("Asking batch signing keys from trusted server {}", server);
if let Ok(keys) = db
.sending
.send_federation_request(
&db.globals,
server,
get_remote_server_keys_batch::v2::Request {
server_keys: servers.clone(),
minimum_valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(60),
)
.expect("time is valid"),
},
)
.await
{
trace!("Got signing keys: {:?}", keys);
let mut pkm = pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?;
for k in keys.server_keys {
// TODO: Check signature from trusted server?
servers.remove(&k.server_name);
let result = db
.globals
.add_signing_key(&k.server_name, k.clone())?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect::<BTreeMap<_, _>>();
pkm.insert(k.server_name.to_string(), result);
}
}
if servers.is_empty() {
return Ok(());
}
}
let mut futures = servers
.into_iter()
.map(|(server, _)| async move {
(
db.sending
.send_federation_request(
&db.globals,
&server,
get_server_keys::v2::Request::new(),
)
.await,
server,
)
})
.collect::<FuturesUnordered<_>>();
while let Some(result) = futures.next().await {
if let (Ok(get_keys_response), origin) = result {
let result = db
.globals
.add_signing_key(&origin, get_keys_response.server_key.clone())?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect::<BTreeMap<_, _>>();
pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?
.insert(origin.to_string(), result);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{add_port_to_hostname, get_ip_with_port, FedDest};