improvement: make state res actually work

next
Timo Kösters 2021-03-13 16:30:12 +01:00
parent 0d55964d24
commit 6da40225bb
No known key found for this signature in database
GPG Key ID: 24DA7517711A2BA4
13 changed files with 537 additions and 541 deletions

97
Cargo.lock generated
View File

@ -187,6 +187,7 @@ dependencies = [
"log",
"opentelemetry",
"opentelemetry-jaeger",
"pretty_env_logger",
"rand",
"regex",
"reqwest",
@ -383,6 +384,19 @@ dependencies = [
"syn",
]
[[package]]
name = "env_logger"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "figment"
version = "0.10.3"
@ -664,6 +678,15 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
[[package]]
name = "hyper"
version = "0.14.4"
@ -688,6 +711,21 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64"
dependencies = [
"futures-util",
"hyper",
"log",
"rustls",
"tokio",
"tokio-rustls",
"webpki",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
@ -1259,6 +1297,16 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "pretty_env_logger"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d"
dependencies = [
"env_logger",
"log",
]
[[package]]
name = "proc-macro-crate"
version = "0.1.5"
@ -1454,6 +1502,7 @@ dependencies = [
"http",
"http-body",
"hyper",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
@ -1463,14 +1512,17 @@ dependencies = [
"native-tls",
"percent-encoding",
"pin-project-lite",
"rustls",
"serde",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webpki-roots",
"winreg 0.7.0",
]
@ -1570,7 +1622,6 @@ dependencies = [
[[package]]
name = "ruma"
version = "0.0.2"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"assign",
"js_int",
@ -1590,7 +1641,6 @@ dependencies = [
[[package]]
name = "ruma-api"
version = "0.17.0-alpha.2"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"http",
"percent-encoding",
@ -1605,7 +1655,6 @@ dependencies = [
[[package]]
name = "ruma-api-macros"
version = "0.17.0-alpha.2"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -1616,7 +1665,6 @@ dependencies = [
[[package]]
name = "ruma-appservice-api"
version = "0.2.0-alpha.2"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"ruma-api",
"ruma-common",
@ -1630,7 +1678,6 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.10.0-alpha.2"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"assign",
"http",
@ -1649,7 +1696,6 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.3.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"js_int",
"maplit",
@ -1662,7 +1708,6 @@ dependencies = [
[[package]]
name = "ruma-events"
version = "0.22.0-alpha.2"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"js_int",
"ruma-common",
@ -1676,7 +1721,6 @@ dependencies = [
[[package]]
name = "ruma-events-macros"
version = "0.22.0-alpha.2"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -1687,7 +1731,6 @@ dependencies = [
[[package]]
name = "ruma-federation-api"
version = "0.1.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"js_int",
"ruma-api",
@ -1702,7 +1745,6 @@ dependencies = [
[[package]]
name = "ruma-identifiers"
version = "0.18.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"paste",
"rand",
@ -1716,7 +1758,6 @@ dependencies = [
[[package]]
name = "ruma-identifiers-macros"
version = "0.18.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"proc-macro2",
"quote",
@ -1727,12 +1768,10 @@ dependencies = [
[[package]]
name = "ruma-identifiers-validation"
version = "0.2.0"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
[[package]]
name = "ruma-identity-service-api"
version = "0.0.1"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"ruma-api",
"ruma-common",
@ -1745,7 +1784,6 @@ dependencies = [
[[package]]
name = "ruma-push-gateway-api"
version = "0.0.1"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"js_int",
"ruma-api",
@ -1760,7 +1798,6 @@ dependencies = [
[[package]]
name = "ruma-serde"
version = "0.3.0"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"form_urlencoded",
"itoa",
@ -1773,7 +1810,6 @@ dependencies = [
[[package]]
name = "ruma-serde-macros"
version = "0.3.0"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"proc-macro-crate",
"proc-macro2",
@ -1784,7 +1820,6 @@ dependencies = [
[[package]]
name = "ruma-signatures"
version = "0.6.0-alpha.1"
source = "git+https://github.com/ruma/ruma?rev=0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc#0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc"
dependencies = [
"base64 0.13.0",
"ring",
@ -2051,7 +2086,6 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483"
[[package]]
name = "state-res"
version = "0.1.0"
source = "git+https://github.com/ruma/state-res?branch=main#d34a78c5b66de419862d9e592bde8e0007111ebd"
dependencies = [
"itertools",
"log",
@ -2136,6 +2170,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.24"
@ -2656,6 +2699,15 @@ dependencies = [
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82015b7e0b8bad8185994674a13a93306bea76cf5a16c5a181382fd3a5ec2376"
dependencies = [
"webpki",
]
[[package]]
name = "weezl"
version = "0.1.4"
@ -2684,6 +2736,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View File

@ -18,16 +18,16 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "93e62c86e
#rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] }
# Used for matrix spec type definitions and helpers
ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" }
#ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "0a10afe6dacc2b7a50a8002c953d10b7fb4e37bc" }
# ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" }
# ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
# Used when doing state resolution
# state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] }
# TODO: remove the gen-eventid feature
state-res = { git = "https://github.com/ruma/state-res", branch = "main", features = ["unstable-pre-spec", "gen-eventid"] }
#state-res = { git = "https://github.com/ruma/state-res", branch = "main", features = ["unstable-pre-spec", "gen-eventid"] }
# state-res = { git = "https://github.com/ruma/state-res", rev = "791c66d73cf064d09db0cdf767d5fef43a343425", features = ["unstable-pre-spec", "gen-eventid"] }
# state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
# Used for long polling and federation sender, should be the same as rocket::tokio
tokio = "1.2.0"
@ -50,7 +50,7 @@ rand = "0.8.3"
# Used to hash passwords
rust-argon2 = "0.8.3"
# Used to send requests
reqwest = "0.11.1"
reqwest = { version = "0.11.1", features = ["rustls-tls"] }
# Used for conduit::Error type
thiserror = "1.0.24"
# Used to generate thumbnails for images
@ -71,6 +71,7 @@ opentelemetry = "0.12.0"
tracing-subscriber = "0.2.16"
tracing-opentelemetry = "0.11.0"
opentelemetry-jaeger = "0.11.0"
pretty_env_logger = "0.4.0"
[features]
default = ["conduit_bin"]

View File

@ -455,16 +455,9 @@ pub async fn register_route(
db.rooms.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMessage,
content: serde_json::to_value(message::MessageEventContent::Text(
message::TextMessageEventContent {
body: "Thanks for trying out Conduit! This software is still in development, so expect many bugs and missing features. If you have federation enabled, you can join the Conduit chat room by typing `/join #conduit:matrix.org`. **Important: Please don't join any other Matrix rooms over federation without permission from the room's admins.** Some actions might trigger bugs in other server implementations, breaking the chat for everyone else.".to_owned(),
formatted: Some(message::FormattedBody {
format: message::MessageFormat::Html,
body: "Thanks for trying out Conduit! This software is still in development, so expect many bugs and missing features. If you have federation enabled, you can join the Conduit chat room by typing <code>/join #conduit:matrix.org</code>. <strong>Important: Please don't join any other Matrix rooms over federation without permission from the room's admins.</strong> Some actions might trigger bugs in other server implementations, breaking the chat for everyone else.".to_owned(),
}),
relates_to: None,
new_content: None,
},
content: serde_json::to_value(message::MessageEventContent::text_html(
"Thanks for trying out Conduit! This software is still in development, so expect many bugs and missing features. If you have federation enabled, you can join the Conduit chat room by typing `/join #conduit:matrix.org`. **Important: Please don't join any other Matrix rooms over federation without permission from the room's admins.** Some actions might trigger bugs in other server implementations, breaking the chat for everyone else.".to_owned(),
"Thanks for trying out Conduit! This software is still in development, so expect many bugs and missing features. If you have federation enabled, you can join the Conduit chat room by typing <code>/join #conduit:matrix.org</code>. <strong>Important: Please don't join any other Matrix rooms over federation without permission from the room's admins.</strong> Some actions might trigger bugs in other server implementations, breaking the chat for everyone else.".to_owned(),
))
.expect("event is valid, we just created it"),
unsigned: None,

View File

@ -1,5 +1,10 @@
use crate::ConduitResult;
use ruma::{api::client::r0::capabilities::get_capabilities, RoomVersionId};
use ruma::{
api::client::r0::capabilities::{
get_capabilities, Capabilities, RoomVersionStability, RoomVersionsCapability,
},
RoomVersionId,
};
use std::collections::BTreeMap;
#[cfg(feature = "conduit_bin")]
@ -12,24 +17,14 @@ use rocket::get;
#[tracing::instrument]
pub async fn get_capabilities_route() -> ConduitResult<get_capabilities::Response> {
let mut available = BTreeMap::new();
available.insert(
RoomVersionId::Version5,
get_capabilities::RoomVersionStability::Stable,
);
available.insert(
RoomVersionId::Version6,
get_capabilities::RoomVersionStability::Stable,
);
available.insert(RoomVersionId::Version5, RoomVersionStability::Stable);
available.insert(RoomVersionId::Version6, RoomVersionStability::Stable);
Ok(get_capabilities::Response {
capabilities: get_capabilities::Capabilities {
change_password: get_capabilities::ChangePasswordCapability::default(), // enabled by default
room_versions: get_capabilities::RoomVersionsCapability {
default: RoomVersionId::Version6,
available,
},
custom_capabilities: BTreeMap::new(),
},
}
.into())
let mut capabilities = Capabilities::new();
capabilities.room_versions = RoomVersionsCapability {
default: RoomVersionId::Version6,
available,
};
Ok(get_capabilities::Response { capabilities }.into())
}

View File

@ -23,7 +23,7 @@ pub async fn set_global_account_data_route(
) -> ConduitResult<set_global_account_data::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let content = serde_json::from_str::<serde_json::Value>(body.data.get())
let data = serde_json::from_str(body.data.get())
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?;
let event_type = body.event_type.to_string();
@ -33,10 +33,7 @@ pub async fn set_global_account_data_route(
sender_user,
event_type.clone().into(),
&BasicEvent {
content: CustomEventContent {
event_type,
json: content,
},
content: CustomEventContent { event_type, data },
},
&db.globals,
)?;

View File

@ -4,7 +4,7 @@ use crate::{
pdu::{PduBuilder, PduEvent},
utils, ConduitResult, Database, Error, Result, Ruma,
};
use log::warn;
use log::{info, warn};
use ruma::{
api::{
client::{
@ -21,11 +21,9 @@ use ruma::{
serde::{to_canonical_value, CanonicalJsonObject, Raw},
EventId, RoomId, RoomVersionId, ServerName, UserId,
};
// use state_res::Event;
use std::{
collections::{BTreeMap, HashSet},
collections::{BTreeMap, HashMap},
convert::TryFrom,
iter,
sync::Arc,
};
@ -439,6 +437,7 @@ pub async fn joined_members_route(
Ok(joined_members::Response { joined }.into())
}
#[tracing::instrument(skip(db))]
async fn join_room_by_id_helper(
db: &Database,
sender_user: Option<&UserId>,
@ -566,23 +565,22 @@ async fn join_room_by_id_helper(
Ok((event_id, value))
};
let room_state = send_join_response.room_state.state.iter().map(add_event_id);
let count = db.globals.next_count()?;
let _state_events = room_state
.clone()
.map(|pdu: Result<(EventId, CanonicalJsonObject)>| Ok(pdu?.0))
.chain(iter::once(Ok(event_id.clone()))) // Add join event we just created
.collect::<Result<HashSet<EventId>>>()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
let auth_chain = send_join_response
let pdu = PduEvent::from_id_val(&event_id, join_event.clone())
.map_err(|_| Error::BadServerResponse("Invalid PDU in send_join response."))?;
let mut state = HashMap::new();
for pdu in send_join_response
.room_state
.auth_chain
.state
.iter()
.map(add_event_id);
let mut event_map = room_state
.chain(auth_chain)
.chain(iter::once(Ok((event_id, join_event)))) // Add join event we just created
.map(add_event_id)
.map(|r| {
let (event_id, value) = r?;
PduEvent::from_id_val(&event_id, value.clone())
@ -592,97 +590,78 @@ async fn join_room_by_id_helper(
Error::BadServerResponse("Invalid PDU in send_join response.")
})
})
.collect::<Result<BTreeMap<EventId, Arc<PduEvent>>>>()?;
let control_events = event_map
.values()
.filter(|pdu| state_res::is_power_event(pdu))
.map(|pdu| pdu.event_id.clone())
.collect::<Vec<_>>();
// These events are not guaranteed to be sorted but they are resolved according to spec
// we auth them anyways to weed out faulty/malicious server. The following is basically the
// full state resolution algorithm.
let event_ids = event_map.keys().cloned().collect::<Vec<_>>();
let sorted_control_events = state_res::StateResolution::reverse_topological_power_sort(
&room_id,
&control_events,
&mut event_map,
&event_ids,
);
// Auth check each event against the "partial" state created by the preceding events
let resolved_control_events = state_res::StateResolution::iterative_auth_check(
room_id,
&RoomVersionId::Version6,
&sorted_control_events,
&BTreeMap::new(), // We have no "clean/resolved" events to add (these extend the `resolved_control_events`)
&mut event_map,
)
.expect("iterative auth check failed on resolved events");
// This removes the control events that failed auth, leaving the resolved
// to be mainline sorted. In the actual `state_res::StateResolution::resolve`
// function both are removed since these are all events we don't know of
// we must keep track of everything to add to our DB.
let events_to_sort = event_map
.keys()
.filter(|id| {
!sorted_control_events.contains(id)
|| resolved_control_events.values().any(|rid| *id == rid)
})
.cloned()
.collect::<Vec<_>>();
let power_level =
resolved_control_events.get(&(EventType::RoomPowerLevels, Some("".to_string())));
// Sort the remaining non control events
let sorted_event_ids = state_res::StateResolution::mainline_sort(
room_id,
&events_to_sort,
power_level,
&mut event_map,
);
let resolved_events = state_res::StateResolution::iterative_auth_check(
room_id,
&RoomVersionId::Version6,
&sorted_event_ids,
&resolved_control_events,
&mut event_map,
)
.expect("iterative auth check failed on resolved events");
// filter the events that failed the auth check keeping the remaining events
// sorted correctly
for ev_id in sorted_event_ids
.iter()
.filter(|id| resolved_events.values().any(|rid| rid == *id))
{
let pdu = event_map
.get(ev_id)
.expect("Found event_id in sorted events that is not in resolved state");
let (id, pdu) = pdu?;
info!("adding {} to outliers: {:#?}", id, pdu);
db.rooms.add_pdu_outlier(&pdu)?;
if let Some(state_key) = &pdu.state_key {
if pdu.kind == EventType::RoomMember {
let target_user_id = UserId::try_from(state_key.clone()).map_err(|_| {
Error::BadServerResponse("Invalid user id in send_join response.")
})?;
// We do not rebuild the PDU in this case only insert to DB
let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
let hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
db.rooms.append_pdu(
&pdu,
utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"),
count,
pdu_id.clone().into(),
&[pdu.event_id.clone()],
&db,
)?;
db.rooms.set_room_state(room_id, &hash)?;
// Update our membership info, we do this here incase a user is invited
// and immediately leaves we need the DB to record the invite event for auth
db.rooms.update_membership(
&pdu.room_id,
&target_user_id,
serde_json::from_value::<member::MemberEventContent>(pdu.content.clone())
.map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid member event content.",
)
})?,
&pdu.sender,
&db.account_data,
&db.globals,
)?;
}
let mut long_id = room_id.as_bytes().to_vec();
long_id.push(0xff);
long_id.extend_from_slice(id.as_bytes());
state.insert((pdu.kind.clone(), state_key.clone()), long_id);
}
}
state.insert(
(
pdu.kind.clone(),
pdu.state_key.clone().expect("join event has state key"),
),
pdu_id.clone(),
);
db.rooms.force_state(room_id, state, &db.globals)?;
for pdu in send_join_response
.room_state
.auth_chain
.iter()
.map(add_event_id)
.map(|r| {
let (event_id, value) = r?;
PduEvent::from_id_val(&event_id, value.clone())
.map(|ev| (event_id, Arc::new(ev)))
.map_err(|e| {
warn!("{:?}: {}", value, e);
Error::BadServerResponse("Invalid PDU in send_join response.")
})
})
{
let (id, pdu) = pdu?;
info!("adding {} to outliers: {:#?}", id, pdu);
db.rooms.add_pdu_outlier(&pdu)?;
}
db.rooms.append_pdu(
&pdu,
utils::to_canonical_object(&pdu).expect("Pdu is valid canonical object"),
db.globals.next_count()?,
pdu_id.into(),
&[pdu.event_id.clone()],
db,
)?;
} else {
let event = member::MemberEventContent {
membership: member::MembershipState::Join,

View File

@ -102,9 +102,14 @@ pub async fn sync_events_route(
// since and the current room state, meaning there should be no updates.
// The inner Option is None when there is an event, but there is no state hash associated
// with it. This can happen for the RoomCreate event, so all updates should arrive.
let first_pdu_after_since = db.rooms.pdus_after(sender_user, &room_id, since).next();
let first_pdu_before_since = db.rooms.pdus_until(sender_user, &room_id, since).next();
let pdus_after_since = db
.rooms
.pdus_after(sender_user, &room_id, since)
.next()
.is_some();
let since_state_hash = first_pdu_after_since
let since_state_hash = first_pdu_before_since
.as_ref()
.map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?);
@ -114,7 +119,7 @@ pub async fn sync_events_route(
invited_member_count,
joined_since_last_sync,
state_events,
) = if since_state_hash != None && Some(&current_state_hash) != since_state_hash.as_ref() {
) = if pdus_after_since && Some(&current_state_hash) != since_state_hash.as_ref() {
let current_state = db.rooms.room_state_full(&room_id)?;
let current_members = current_state
.iter()
@ -138,9 +143,9 @@ pub async fn sync_events_route(
// Calculations:
let new_encrypted_room =
encrypted_room && since_encryption.map_or(false, |encryption| encryption.is_none());
encrypted_room && since_encryption.map_or(true, |encryption| encryption.is_none());
let send_member_count = since_state.as_ref().map_or(false, |since_state| {
let send_member_count = since_state.as_ref().map_or(true, |since_state| {
since_state.as_ref().map_or(true, |since_state| {
current_members.len()
!= since_state
@ -179,7 +184,7 @@ pub async fn sync_events_route(
let since_membership =
since_state
.as_ref()
.map_or(MembershipState::Join, |since_state| {
.map_or(MembershipState::Leave, |since_state| {
since_state
.as_ref()
.and_then(|since_state| {
@ -221,7 +226,7 @@ pub async fn sync_events_route(
}
}
let joined_since_last_sync = since_sender_member.map_or(false, |member| {
let joined_since_last_sync = since_sender_member.map_or(true, |member| {
member.map_or(true, |member| member.membership != MembershipState::Join)
});
@ -310,7 +315,7 @@ pub async fn sync_events_route(
(None, None, Vec::new())
};
let state_events = if joined_since_last_sync {
let state_events = if dbg!(joined_since_last_sync) {
current_state
.into_iter()
.map(|(_, pdu)| pdu.to_sync_state_event())

View File

@ -165,9 +165,8 @@ impl Database {
stateid_pduid: db.open_tree("stateid_pduid")?,
pduid_statehash: db.open_tree("pduid_statehash")?,
roomid_statehash: db.open_tree("roomid_statehash")?,
roomeventid_outlierpdu: db.open_tree("roomeventid_outlierpdu")?,
eventid_outlierpdu: db.open_tree("roomeventid_outlierpdu")?,
prevevent_parent: db.open_tree("prevevent_parent")?,
roomeventid_outlierpducount: db.open_tree("roomeventid_outlierpducount")?,
},
account_data: account_data::AccountData {
roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?,

View File

@ -24,7 +24,7 @@ pub struct Globals {
reqwest_client: reqwest::Client,
dns_resolver: TokioAsyncResolver,
jwt_decoding_key: Option<jsonwebtoken::DecodingKey<'static>>,
pub(super) servertimeout_signingkey: sled::Tree, // ServerName -> algorithm:key + pubkey
pub(super) servertimeout_signingkey: sled::Tree, // ServerName + Timeout Timestamp -> algorithm:key + pubkey
}
impl Globals {
@ -157,37 +157,31 @@ impl Globals {
///
/// This doesn't actually check that the keys provided are newer than the old set.
pub fn add_signing_key(&self, origin: &ServerName, keys: &ServerSigningKeys) -> Result<()> {
// Remove outdated keys
let now = crate::utils::millis_since_unix_epoch();
for item in self.servertimeout_signingkey.scan_prefix(origin.as_bytes()) {
let (k, _) = item?;
let valid_until = k
.splitn(2, |&b| b == 0xff)
.nth(1)
.map(crate::utils::u64_from_bytes)
.ok_or_else(|| Error::bad_database("Invalid signing keys."))?
.map_err(|_| Error::bad_database("Invalid signing key valid until bytes"))?;
let mut key1 = origin.as_bytes().to_vec();
key1.push(0xff);
if now > valid_until {
self.servertimeout_signingkey.remove(k)?;
}
}
let mut key2 = key1.clone();
let mut key = origin.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(
&(keys
.valid_until_ts
.duration_since(std::time::UNIX_EPOCH)
.expect("time is valid")
.as_millis() as u64)
.to_be_bytes(),
);
let ts = keys
.valid_until_ts
.duration_since(std::time::UNIX_EPOCH)
.expect("time is valid")
.as_millis() as u64;
key1.extend_from_slice(&ts.to_be_bytes());
key2.extend_from_slice(&(ts + 1).to_be_bytes());
self.servertimeout_signingkey.insert(
key,
key1,
serde_json::to_vec(&keys.verify_keys).expect("ServerSigningKeys are a valid string"),
)?;
self.servertimeout_signingkey.insert(
key2,
serde_json::to_vec(&keys.old_verify_keys)
.expect("ServerSigningKeys are a valid string"),
)?;
Ok(())
}
@ -196,7 +190,10 @@ impl Globals {
&self,
origin: &ServerName,
) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> {
let mut response = BTreeMap::new();
let now = crate::utils::millis_since_unix_epoch();
for item in self.servertimeout_signingkey.scan_prefix(origin.as_bytes()) {
let (k, bytes) = item?;
let valid_until = k
@ -207,10 +204,11 @@ impl Globals {
.map_err(|_| Error::bad_database("Invalid signing key valid until bytes"))?;
// If these keys are still valid use em!
if valid_until > now {
return serde_json::from_slice(&bytes)
.map_err(|_| Error::bad_database("Invalid BTreeMap<> of signing keys"));
let btree: BTreeMap<_, _> = serde_json::from_slice(&bytes)
.map_err(|_| Error::bad_database("Invalid BTreeMap<> of signing keys"))?;
response.extend(btree);
}
}
Ok(BTreeMap::default())
Ok(response)
}
}

View File

@ -3,7 +3,7 @@ mod edus;
pub use edus::RoomEdus;
use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result};
use log::error;
use log::{error, info, warn};
use regex::Regex;
use ring::digest;
use ruma::{
@ -71,10 +71,7 @@ pub struct Rooms {
/// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
pub(super) roomeventid_outlierpdu: sled::Tree,
/// RoomId + EventId -> count of the last known pdu when the outlier was inserted.
/// This allows us to skip any state snapshots that would for sure not have the outlier.
pub(super) roomeventid_outlierpducount: sled::Tree,
pub(super) eventid_outlierpdu: sled::Tree,
/// RoomId + EventId -> Parent PDU EventId.
pub(super) prevevent_parent: sled::Tree,
@ -89,19 +86,21 @@ impl Rooms {
room_id: &RoomId,
state_hash: &StateHashId,
) -> Result<BTreeMap<(EventType, String), PduEvent>> {
self.stateid_pduid
let r = self
.stateid_pduid
.scan_prefix(&state_hash)
.values()
.map(|pduid_short| {
let mut pduid = room_id.as_bytes().to_vec();
pduid.push(0xff);
pduid.extend_from_slice(&pduid_short?);
match self.pduid_pdu.get(&pduid)? {
.map(|short_id| {
let short_id = short_id?;
let mut long_id = room_id.as_bytes().to_vec();
long_id.push(0xff);
long_id.extend_from_slice(&short_id);
match self.pduid_pdu.get(&long_id)? {
Some(b) => serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db.")),
None => self
.roomeventid_outlierpdu
.get(pduid)?
.eventid_outlierpdu
.get(short_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
@ -124,7 +123,9 @@ impl Rooms {
pdu,
))
})
.collect()
.collect();
r
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
@ -140,6 +141,8 @@ impl Rooms {
key.push(0xff);
key.extend_from_slice(&state_key.as_bytes());
info!("Looking for {} {:?}", event_type, state_key);
let short = self.statekey_short.get(&key)?;
if let Some(short) = short {
@ -147,32 +150,40 @@ impl Rooms {
stateid.push(0xff);
stateid.extend_from_slice(&short);
info!("trying to find pduid/eventid. short: {:?}", stateid);
self.stateid_pduid
.get(&stateid)?
.map_or(Ok(None), |pdu_id_short| {
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&pdu_id_short);
.map_or(Ok(None), |short_id| {
info!("found in stateid_pduid");
let mut long_id = room_id.as_bytes().to_vec();
long_id.push(0xff);
long_id.extend_from_slice(&short_id);
Ok::<_, Error>(Some((
pdu_id.clone().into(),
match self.pduid_pdu.get(&pdu_id)? {
Some(b) => serde_json::from_slice::<PduEvent>(&b)
Ok::<_, Error>(Some(match self.pduid_pdu.get(&long_id)? {
Some(b) => (
long_id.clone().into(),
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
None => self
.roomeventid_outlierpdu
.get(pdu_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})??,
},
)))
),
None => {
info!("looking in outliers");
(
short_id.clone().into(),
self.eventid_outlierpdu
.get(&short_id)?
.map(|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})??,
)
}
}))
})
} else {
info!("short id not found");
Ok(None)
}
}
@ -215,6 +226,8 @@ impl Rooms {
.ok_or_else(|| Error::bad_database("Saved auth event with no state key."))?,
)? {
events.insert((event_type, state_key), pdu);
} else {
warn!("Could not find {} {:?} in state", event_type, state_key);
}
}
Ok(events)
@ -253,11 +266,11 @@ impl Rooms {
globals: &super::globals::Globals,
) -> Result<()> {
let state_hash =
self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::<Vec<_>>())?;
self.calculate_hash(&state.values().map(|long_id| &**long_id).collect::<Vec<_>>())?;
let mut prefix = state_hash.to_vec();
prefix.push(0xff);
for ((event_type, state_key), id_long) in state {
for ((event_type, state_key), long_id) in state {
let mut statekey = event_type.as_ref().as_bytes().to_vec();
statekey.push(0xff);
statekey.extend_from_slice(&state_key.as_bytes());
@ -273,16 +286,13 @@ impl Rooms {
}
};
// Because of outliers this could also be an eventID but that
// is handled by `state_full`
let pdu_id_short = id_long
.splitn(2, |&b| b == 0xff)
.nth(1)
.ok_or_else(|| Error::bad_database("Invalid pduid in state."))?;
// If it's a pdu id we remove the room id, if it's an event id we leave it the same
let short_id = long_id.splitn(2, |&b| b == 0xff).nth(1).unwrap_or(&long_id);
let mut state_id = prefix.clone();
state_id.extend_from_slice(&short.to_be_bytes());
self.stateid_pduid.insert(state_id, pdu_id_short)?;
info!("inserting {:?} into {:?}", short_id, state_id);
self.stateid_pduid.insert(state_id, short_id)?;
}
self.roomid_statehash
@ -348,20 +358,19 @@ impl Rooms {
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<serde_json::Value>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| {
Ok(Some(
serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? {
Some(b) => b,
None => self
.roomeventid_outlierpdu
.get(event_id.as_bytes())?
.ok_or_else(|| {
Error::bad_database("Event is not in pdu tree or outliers.")
})?,
})
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
.map_or_else::<Result<_>, _, _>(
|| Ok(self.eventid_outlierpdu.get(event_id.as_bytes())?),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pdu| {
Ok(serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?)
})
.transpose()
}
/// Returns the pdu's id.
@ -371,24 +380,31 @@ impl Rooms {
.map_or(Ok(None), |pdu_id| Ok(Some(pdu_id)))
}
pub fn get_long_id(&self, event_id: &EventId) -> Result<Vec<u8>> {
Ok(self
.get_pdu_id(event_id)?
.map_or_else(|| event_id.as_bytes().to_vec(), |pduid| pduid.to_vec()))
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| {
Ok(Some(
serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? {
Some(b) => b,
None => match self.roomeventid_outlierpdu.get(event_id.as_bytes())? {
Some(b) => b,
None => return Ok(None),
},
})
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
.map_or_else::<Result<_>, _, _>(
|| Ok(self.eventid_outlierpdu.get(event_id.as_bytes())?),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pdu| {
Ok(serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?)
})
.transpose()
}
/// Returns the pdu.
@ -484,7 +500,7 @@ impl Rooms {
/// Returns the pdu from the outlier tree.
pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.roomeventid_outlierpdu
self.eventid_outlierpdu
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
@ -494,25 +510,12 @@ impl Rooms {
/// Append the PDU as an outlier.
///
/// Any event given to this will be processed (state-res) on another thread.
pub fn append_pdu_outlier(&self, pdu: &PduEvent) -> Result<()> {
log::info!(
"Number of outlier pdu's {}",
self.roomeventid_outlierpdu.len()
);
let mut key = pdu.room_id().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(pdu.event_id().as_bytes());
self.eventid_pduid
.insert(pdu.event_id().as_bytes(), key.as_slice())?;
self.roomeventid_outlierpdu.insert(
&key,
pub fn add_pdu_outlier(&self, pdu: &PduEvent) -> Result<()> {
self.eventid_outlierpdu.insert(
&pdu.event_id.as_bytes(),
&*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"),
)?;
self.roomeventid_outlierpducount
.insert(&key, &self.latest_pdu_count(pdu.room_id())?.to_be_bytes())?;
Ok(())
}
@ -557,50 +560,6 @@ impl Rooms {
}
}
// We no longer keep this pdu as an outlier
let mut key = pdu.room_id().as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(pdu.event_id().as_bytes());
if self.roomeventid_outlierpdu.remove(&key)?.is_some() {
if let Some(state_key) = pdu.state_key.as_deref() {
let mut statekey = pdu.kind().as_ref().as_bytes().to_vec();
statekey.extend_from_slice(state_key.as_bytes());
let short = match self.statekey_short.get(&statekey)? {
Some(short) => utils::u64_from_bytes(&short).map_err(|_| {
Error::bad_database("Invalid short bytes in statekey_short.")
})?,
None => {
error!(
"This event has been inserted into the state snapshot tree previously."
);
let short = db.globals.next_count()?;
self.statekey_short
.insert(&statekey, &short.to_be_bytes())?;
short
}
};
let mut start = pdu.room_id().as_bytes().to_vec();
start.extend_from_slice(
&self
.roomeventid_outlierpducount
.get(&key)?
.unwrap_or_default(),
);
for hash in self.pduid_statehash.range(start..).values() {
let mut hash = hash?.to_vec();
hash.extend_from_slice(&short.to_be_bytes());
let _ = dbg!(self.stateid_pduid.compare_and_swap(
hash,
Some(pdu.event_id().as_bytes()),
Some(pdu_id.as_ref()),
)?);
}
}
}
// We must keep track of all events that have been referenced.
for leaf in leaves {
let mut key = pdu.room_id().as_bytes().to_vec();
@ -1275,7 +1234,7 @@ impl Rooms {
}
/// Update current membership data.
fn update_membership(
pub fn update_membership(
&self,
room_id: &RoomId,
user_id: &UserId,

View File

@ -346,6 +346,8 @@ impl Sending {
.collect::<Vec<_>>();
let permit = maximum_requests.acquire().await;
info!("sending pdus to {}: {:#?}", server, pdu_jsons);
let response = server_server::send_request(
&globals,
&*server,
@ -361,7 +363,10 @@ impl Sending {
},
)
.await
.map(|_response| (server.clone(), is_appservice))
.map(|response| {
info!("server response: {:?}", response);
(server.clone(), is_appservice)
})
.map_err(|e| (server, is_appservice, e));
drop(permit);

View File

@ -204,6 +204,8 @@ async fn main() {
rocket.launch().await.unwrap();
} else {
pretty_env_logger::init();
let root = span!(tracing::Level::INFO, "app_start", work_units = 2);
let _enter = root.enter();

View File

@ -509,7 +509,7 @@ pub async fn send_transaction_message_route<'a>(
return Err(Error::bad_config("Federation is disabled."));
}
// dbg!(&*body);
info!("Incoming PDUs: {:?}", &body.pdus);
for edu in &body.edus {
match serde_json::from_str::<send_transaction_message::v1::Edu>(edu.json().get()) {
@ -600,37 +600,11 @@ pub async fn send_transaction_message_route<'a>(
// events over federation. For example, the Federation API's /send endpoint would
// discard the event whereas the Client Server API's /send/{eventType} endpoint
// would return a M_BAD_JSON error.
'main_pdu_loop: for (event_id, room_id, value) in pdus_to_resolve {
'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve {
info!("Working on incoming pdu: {:?}", value);
let server_name = &body.body.origin;
let mut pub_key_map = BTreeMap::new();
if let Some(CanonicalJsonValue::String(sender)) = value.get("sender") {
let sender =
UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field");
let origin = sender.server_name();
let keys = match fetch_signing_keys(&db, &room_id, origin).await {
Ok(keys) => keys,
Err(_) => {
resolved_map.insert(
event_id,
Err("Could not find signing keys for this server".to_string()),
);
continue;
}
};
pub_key_map.insert(
origin.to_string(),
keys.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect(),
);
} else {
resolved_map.insert(event_id, Err("No field `signatures` in JSON".to_string()));
continue;
}
// TODO: make this persist but not a DB Tree...
// This is all the auth_events that have been recursively fetched so they don't have to be
// deserialized over and over again. This could potentially also be some sort of trie (suffix tree)
@ -645,11 +619,11 @@ pub async fn send_transaction_message_route<'a>(
// 7. if not timeline event: stop
// TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
// the events found in step 8 can be authed/resolved and appended to the DB
let (pdu, previous): (Arc<PduEvent>, Vec<Arc<PduEvent>>) = match validate_event(
let (pdu, previous_create): (Arc<PduEvent>, Option<Arc<PduEvent>>) = match validate_event(
&db,
value,
event_id.clone(),
&pub_key_map,
&mut pub_key_map,
server_name,
// All the auth events gathered will be here
&mut auth_cache,
@ -662,15 +636,11 @@ pub async fn send_transaction_message_route<'a>(
continue;
}
};
let single_prev = if previous.len() == 1 {
previous.first().cloned()
} else {
None
};
info!("Validated event.");
// 6. persist the event as an outlier.
db.rooms.append_pdu_outlier(&pdu)?;
db.rooms.add_pdu_outlier(&pdu)?;
info!("Added pdu as outlier.");
// Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
// the checks in this list starting at 1. These are not timeline events.
@ -679,6 +649,7 @@ pub async fn send_transaction_message_route<'a>(
//
// TODO: if we know the prev_events of the incoming event we can avoid the request and build
// the state from a known point and resolve if > 1 prev_event
info!("Requesting state at event.");
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
match db
.sending
@ -693,14 +664,20 @@ pub async fn send_transaction_message_route<'a>(
.await
{
Ok(res) => {
let state = fetch_events(
info!("Fetching state events at event.");
let state = match fetch_events(
&db,
server_name,
&pub_key_map,
&mut pub_key_map,
&res.pdu_ids,
&mut auth_cache,
)
.await?;
.await
{
Ok(state) => state,
Err(_) => continue,
};
// Sanity check: there are no conflicting events in the state we received
let mut seen = BTreeSet::new();
for ev in &state {
@ -716,17 +693,21 @@ pub async fn send_transaction_message_route<'a>(
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect();
(
state,
fetch_events(
&db,
server_name,
&pub_key_map,
&res.auth_chain_ids,
&mut auth_cache,
)
.await?,
let incoming_auth_events = match fetch_events(
&db,
server_name,
&mut pub_key_map,
&res.auth_chain_ids,
&mut auth_cache,
)
.await
{
Ok(state) => state,
Err(_) => continue,
};
info!("Fetching auth events of state events at event.");
(state, incoming_auth_events)
}
Err(_) => {
resolved_map.insert(
@ -741,7 +722,7 @@ pub async fn send_transaction_message_route<'a>(
if !state_res::event_auth::auth_check(
&RoomVersionId::Version6,
&pdu,
single_prev.clone(),
previous_create.clone(),
&state_at_event,
None, // TODO: third party invite
)
@ -754,6 +735,7 @@ pub async fn send_transaction_message_route<'a>(
);
continue;
}
info!("Auth check succeeded.");
// End of step 10.
// 12. check if the event passes auth based on the "current state" of the room, if not "soft fail" it
@ -764,10 +746,12 @@ pub async fn send_transaction_message_route<'a>(
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect();
info!("current state: {:#?}", current_state);
if !state_res::event_auth::auth_check(
&RoomVersionId::Version6,
&pdu,
single_prev.clone(),
previous_create,
&current_state,
None,
)
@ -780,6 +764,7 @@ pub async fn send_transaction_message_route<'a>(
);
continue;
};
info!("Auth check with current state succeeded.");
// Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
@ -787,7 +772,10 @@ pub async fn send_transaction_message_route<'a>(
// calculate_forward_extremities takes care of adding the current state if not already in the state sets
// it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
let extremities = match calculate_forward_extremities(&db, &pdu).await {
Ok(fork_ids) => fork_ids,
Ok(fork_ids) => {
info!("Calculated new forward extremities: {:?}", fork_ids);
fork_ids
}
Err(_) => {
resolved_map.insert(event_id, Err("Failed to gather forward extremities".into()));
continue;
@ -836,7 +824,6 @@ pub async fn send_transaction_message_route<'a>(
// We do need to force an update to this rooms state
update_state = true;
// TODO: remove this is for current debugging Jan, 15 2021
let mut auth_events = vec![];
for map in &fork_states {
let mut state_auth = vec![];
@ -877,6 +864,8 @@ pub async fn send_transaction_message_route<'a>(
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)),
);
info!("auth events: {:?}", auth_cache);
let res = match state_res::StateResolution::resolve(
pdu.room_id(),
&RoomVersionId::Version6,
@ -927,6 +916,7 @@ pub async fn send_transaction_message_route<'a>(
// We use the `state_at_event` instead of `state_after` so we accurately
// represent the state for this event.
append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?;
info!("Appended incoming pdu.");
// Set the new room state to the resolved state
update_resolved_state(
@ -938,6 +928,7 @@ pub async fn send_transaction_message_route<'a>(
None
},
)?;
info!("Updated resolved state");
// Event has passed all auth/stateres checks
}
@ -962,17 +953,52 @@ type AsyncRecursiveResult<'a, T> = Pin<Box<dyn Future<Output = StdResult<T, Stri
/// 5. reject "due to auth events" if the event doesn't pass auth based on the auth events
/// 7. if not timeline event: stop
/// 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
#[tracing::instrument(skip(db))]
fn validate_event<'a>(
db: &'a Database,
value: CanonicalJsonObject,
event_id: EventId,
pub_key_map: &'a PublicKeyMap,
pub_key_map: &'a mut PublicKeyMap,
origin: &'a ServerName,
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
) -> AsyncRecursiveResult<'a, (Arc<PduEvent>, Vec<Arc<PduEvent>>)> {
) -> AsyncRecursiveResult<'a, (Arc<PduEvent>, Option<Arc<PduEvent>>)> {
Box::pin(async move {
for signature_server in match value
.get("signatures")
.ok_or_else(|| "No signatures in server response pdu.".to_string())?
{
CanonicalJsonValue::Object(map) => map,
_ => return Err("Invalid signatures object in server response pdu.".to_string()),
}
.keys()
{
info!("Fetching signing keys for {}", signature_server);
let keys = match fetch_signing_keys(
&db,
&Box::<ServerName>::try_from(&**signature_server).map_err(|_| {
"Invalid servername in signatures of server response pdu.".to_string()
})?,
)
.await
{
Ok(keys) => {
info!("Keys: {:?}", keys);
keys
}
Err(_) => {
return Err(
"Signature verification failed: Could not fetch signing key.".to_string(),
);
}
};
pub_key_map.insert(signature_server.clone(), keys);
info!("Fetched signing keys");
}
let mut val =
match ruma::signatures::verify_event(pub_key_map, &value, &RoomVersionId::Version6) {
match ruma::signatures::verify_event(pub_key_map, &value, &RoomVersionId::Version5) {
Ok(ver) => {
if let ruma::signatures::Verified::Signatures = ver {
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
@ -1000,26 +1026,34 @@ fn validate_event<'a>(
)
.map_err(|_| "Event is not a valid PDU".to_string())?;
info!("Fetching auth events.");
fetch_check_auth_events(db, origin, pub_key_map, &pdu.auth_events, auth_cache)
.await
.map_err(|e| e.to_string())?;
let pdu = Arc::new(pdu.clone());
/*
// 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
let previous = fetch_events(&db, origin, &pub_key_map, &pdu.prev_events, auth_cache)
info!("Fetching prev events.");
let previous = fetch_events(&db, origin, pub_key_map, &pdu.prev_events, auth_cache)
.await
.map_err(|e| e.to_string())?;
*/
// if the previous event was the create event special rules apply
let previous_create = if pdu.auth_events.len() == 1 && pdu.prev_events == pdu.auth_events {
auth_cache.get(&pdu.auth_events[0]).cloned()
} else {
None
};
// Check that the event passes auth based on the auth_events
info!("Checking auth.");
let is_authed = state_res::event_auth::auth_check(
&RoomVersionId::Version6,
&pdu,
if previous.len() == 1 {
previous.first().cloned()
} else {
None
},
previous_create.clone(),
&pdu.auth_events
.iter()
.map(|id| {
@ -1039,39 +1073,20 @@ fn validate_event<'a>(
return Err("Event has failed auth check with auth events".to_string());
}
Ok((pdu, previous))
info!("Validation successful.");
Ok((pdu, previous_create))
})
}
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
/// The check in `fetch_check_auth_events` is that a complete chain is found for the
/// events `auth_events`. If the chain is found to have any missing events it fails.
#[tracing::instrument(skip(db))]
async fn fetch_check_auth_events(
db: &Database,
origin: &ServerName,
key_map: &PublicKeyMap,
key_map: &mut PublicKeyMap,
event_ids: &[EventId],
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<()> {
let mut stack = event_ids.to_vec();
// DFS for auth event chain
while !stack.is_empty() {
let ev_id = stack.pop().unwrap();
if auth_cache.contains_key(&ev_id) {
continue;
}
// TODO: Batch these async calls so we can wait on multiple at once
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
.await
.map(|mut vec| {
vec.pop()
.ok_or_else(|| Error::Conflict("Event was not found in fetch_events"))
})??;
stack.extend(ev.auth_events());
}
fetch_events(db, origin, key_map, event_ids, auth_cache).await?;
Ok(())
}
@ -1086,44 +1101,58 @@ async fn fetch_check_auth_events(
///
/// If the event is unknown to the `auth_cache` it is added. This guarantees that any
/// event we need to know of will be present.
#[tracing::instrument(skip(db))]
pub(crate) async fn fetch_events(
db: &Database,
origin: &ServerName,
key_map: &PublicKeyMap,
key_map: &mut PublicKeyMap,
events: &[EventId],
auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<Vec<Arc<PduEvent>>> {
let mut pdus = vec![];
for id in events {
info!("Fetching event: {}", id);
let pdu = match auth_cache.get(id) {
Some(pdu) => pdu.clone(),
Some(pdu) => {
info!("Event found in cache");
pdu.clone()
}
// `get_pdu` checks the outliers tree for us
None => match db.rooms.get_pdu(&id)? {
Some(pdu) => Arc::new(pdu),
None => match db
.sending
.send_federation_request(
&db.globals,
origin,
get_event::v1::Request { event_id: &id },
)
.await
{
Ok(res) => {
let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu);
let (pdu, _) =
validate_event(db, value, event_id, key_map, origin, auth_cache)
.await
.map_err(|e| {
error!("{:?}", e);
Error::Conflict("Authentication of event failed")
})?;
Some(pdu) => {
info!("Event found in outliers");
Arc::new(pdu)
}
None => {
info!("Fetching event over federation");
match db
.sending
.send_federation_request(
&db.globals,
origin,
get_event::v1::Request { event_id: &id },
)
.await
{
Ok(res) => {
info!("Got event over federation: {:?}", res);
let (event_id, value) =
crate::pdu::gen_event_id_canonical_json(&res.pdu);
let (pdu, _) =
validate_event(db, value, event_id, key_map, origin, auth_cache)
.await
.map_err(|e| {
error!("ERROR: {:?}", e);
Error::Conflict("Authentication of event failed")
})?;
db.rooms.append_pdu_outlier(&pdu)?;
pdu
info!("Added fetched pdu as outlier.");
db.rooms.add_pdu_outlier(&pdu)?;
pdu
}
Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
}
Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
},
}
},
};
auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());
@ -1134,14 +1163,23 @@ pub(crate) async fn fetch_events(
/// Search the DB for the signing keys of the given server, if we don't have them
/// fetch them from the server and save to our DB.
#[tracing::instrument(skip(db))]
pub(crate) async fn fetch_signing_keys(
db: &Database,
room_id: &RoomId,
origin: &ServerName,
) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> {
) -> Result<BTreeMap<String, String>> {
let mut result = BTreeMap::new();
match db.globals.signing_keys_for(origin)? {
keys if !keys.is_empty() => Ok(keys),
keys if !keys.is_empty() => {
info!("we knew the signing keys already: {:?}", keys);
Ok(keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect())
}
_ => {
info!("Asking {} for it's signing key", origin);
match db
.sending
.send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
@ -1149,13 +1187,24 @@ pub(crate) async fn fetch_signing_keys(
{
Ok(keys) => {
db.globals.add_signing_key(origin, &keys.server_key)?;
Ok(keys.server_key.verify_keys)
result.extend(
keys.server_key
.verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
result.extend(
keys.server_key
.old_verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
return Ok(result);
}
_ => {
for server in db.rooms.room_servers(room_id).filter(
|ser| matches!(ser, Ok(s) if db.globals.trusted_servers().contains(s)),
) {
let server = server?;
for server in db.globals.trusted_servers() {
info!("Asking {} for {}'s signing key", server, origin);
if let Ok(keys) = db
.sending
.send_federation_request(
@ -1170,30 +1219,21 @@ pub(crate) async fn fetch_signing_keys(
)
.await
{
let mut trust = 0;
let keys: Vec<ServerSigningKeys> = keys.server_keys;
let key = keys.iter().fold(None, |mut key, next| {
if let Some(verified) = &key {
// rustc cannot elide this type for some reason
let v: &ServerSigningKeys = verified;
if v.verify_keys
.iter()
.zip(next.verify_keys.iter())
.all(|(a, b)| a.1.key == b.1.key)
{
trust += 1;
}
} else {
key = Some(next.clone())
}
key
});
if trust == (keys.len() - 1) && key.is_some() {
let k = key.unwrap();
info!("Got signing keys: {:?}", keys);
for k in keys.server_keys.into_iter() {
db.globals.add_signing_key(origin, &k)?;
return Ok(k.verify_keys);
result.extend(
k.verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
result.extend(
k.old_verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
}
return Ok(result);
}
}
Err(Error::BadServerResponse(
@ -1211,6 +1251,7 @@ pub(crate) async fn fetch_signing_keys(
/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
///
/// The state snapshot of the incoming event __needs__ to be added to the resulting list.
#[tracing::instrument(skip(db))]
pub(crate) async fn calculate_forward_extremities(
db: &Database,
pdu: &PduEvent,
@ -1261,6 +1302,7 @@ pub(crate) async fn calculate_forward_extremities(
///
/// This guarantees that the incoming event will be in the state sets (at least our servers
/// and the sending server).
#[tracing::instrument(skip(db))]
pub(crate) async fn build_forward_extremity_snapshots(
db: &Database,
pdu: Arc<PduEvent>,
@ -1275,12 +1317,14 @@ pub(crate) async fn build_forward_extremity_snapshots(
let mut includes_current_state = false;
let mut fork_states = BTreeSet::new();
for id in current_leaves {
if id == &pdu.event_id {
continue;
}
match db.rooms.get_pdu_id(id)? {
// We can skip this because it is handled outside of this function
// The current server state and incoming event state are built to be
// the state after.
// This would be the incoming state from the server.
Some(_) if id == pdu.event_id() => {}
Some(pduid) if db.rooms.get_pdu_from_id(&pduid)?.is_some() => {
let state_hash = db
.rooms
@ -1308,40 +1352,7 @@ pub(crate) async fn build_forward_extremity_snapshots(
}
_ => {
error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind());
let res = db
.sending
.send_federation_request(
&db.globals,
origin,
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: id,
},
)
.await?;
// TODO: This only adds events to the auth_cache, there is for sure a better way to
// do this...
fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?;
let mut state_before =
fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache)
.await?
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect::<StateMap<_>>();
if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache)
.await?
.pop()
{
let key = (pdu.kind.clone(), pdu.state_key());
state_before.insert(key, pdu);
}
// Now it's the state after
fork_states.insert(state_before);
return Err(Error::BadDatabase("Missing state snapshot."));
}
}
}
@ -1353,9 +1364,11 @@ pub(crate) async fn build_forward_extremity_snapshots(
fork_states.insert(current_state);
}
info!("Fork states: {:?}", fork_states);
Ok(fork_states)
}
#[tracing::instrument(skip(db))]
pub(crate) fn update_resolved_state(
db: &Database,
room_id: &RoomId,
@ -1366,22 +1379,14 @@ pub(crate) fn update_resolved_state(
if let Some(state) = state {
let mut new_state = HashMap::new();
for ((ev_type, state_k), pdu) in state {
match db.rooms.get_pdu_id(pdu.event_id())? {
Some(pduid) => {
new_state.insert(
(
ev_type,
state_k.ok_or_else(|| {
Error::Conflict("State contained non state event")
})?,
),
pduid.to_vec(),
);
}
None => {
error!("We are missing a state event for the current room state.");
}
}
let long_id = db.rooms.get_long_id(&pdu.event_id)?;
new_state.insert(
(
ev_type,
state_k.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
long_id,
);
}
db.rooms.force_state(room_id, new_state, &db.globals)?;
@ -1392,6 +1397,7 @@ pub(crate) fn update_resolved_state(
/// Append the incoming event setting the state snapshot to the state from the
/// server that sent the event.
#[tracing::instrument(skip(db))]
pub(crate) fn append_incoming_pdu(
db: &Database,
pdu: &PduEvent,
@ -1402,20 +1408,16 @@ pub(crate) fn append_incoming_pdu(
// We can tell if we need to do this based on wether state resolution took place or not
let mut new_state = HashMap::new();
for ((ev_type, state_k), state_pdu) in state {
match db.rooms.get_pdu_id(state_pdu.event_id())? {
Some(state_pduid) => {
new_state.insert(
(
ev_type.clone(),
state_k
.clone()
.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
state_pduid.to_vec(),
);
}
None => error!("We are missing a state event for the incoming event snapshot"),
}
let long_id = db.rooms.get_long_id(state_pdu.event_id())?;
new_state.insert(
(
ev_type.clone(),
state_k
.clone()
.ok_or_else(|| Error::Conflict("State contained non state event"))?,
),
long_id.to_vec(),
);
}
db.rooms