From c213769d9f8448b21ae414db62a68ea0044701cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 25 Mar 2021 23:55:40 +0100 Subject: [PATCH] improvement: refactor state res and fix a few bugs in the process --- Cargo.lock | 2 +- Cargo.toml | 4 +- src/client_server/membership.rs | 8 +- src/client_server/sync.rs | 6 +- src/database/pusher.rs | 31 +- src/database/rooms.rs | 302 +++++---- src/database/sending.rs | 7 +- src/server_server.rs | 1124 +++++++++++++------------------ 8 files changed, 651 insertions(+), 833 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index adcc27b..9580942 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2105,7 +2105,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res?rev=34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488#34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488" +source = "git+https://github.com/timokoesters/state-res?branch=improvements#1621a491a9e867a1ad4dff9f2f92b0c1e2d44aa0" dependencies = [ "itertools 0.10.0", "log", diff --git a/Cargo.toml b/Cargo.toml index 33f1d1e..453bc8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,10 +23,10 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-a #ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks"] } # Used when doing state resolution -# state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } +state-res = { git = "https://github.com/timokoesters/state-res", branch = "improvements", features = ["unstable-pre-spec"] } # TODO: remove the gen-eventid feature #state-res = { git = "https://github.com/ruma/state-res", branch = "main", features = ["unstable-pre-spec", "gen-eventid"] } -state-res = { git = "https://github.com/ruma/state-res", rev = "34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488", features = ["unstable-pre-spec", "gen-eventid"] } +#state-res = { git = "https://github.com/ruma/state-res", rev = "1621a491a9e867a1ad4dff9f2f92b0c1e2d44aa0", features = ["unstable-pre-spec", "gen-eventid"] } #state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] } # Used for long polling and federation sender, should be the same as rocket::tokio diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 36bbced..6a64ea4 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -21,11 +21,7 @@ use ruma::{ serde::{to_canonical_value, CanonicalJsonObject, Raw}, EventId, RoomId, RoomVersionId, ServerName, UserId, }; -use std::{ - collections::{BTreeMap, HashMap}, - convert::TryFrom, - sync::Arc, -}; +use std::{collections::BTreeMap, convert::TryFrom, sync::Arc}; #[cfg(feature = "conduit_bin")] use rocket::{get, post}; @@ -573,7 +569,7 @@ async fn join_room_by_id_helper( let pdu = PduEvent::from_id_val(&event_id, join_event.clone()) .map_err(|_| Error::BadServerResponse("Invalid PDU in send_join response."))?; - let mut state = HashMap::new(); + let mut state = BTreeMap::new(); for pdu in send_join_response .room_state diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index d38699c..bd7046d 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -1,5 +1,6 @@ use super::State; use crate::{ConduitResult, Database, Error, Ruma}; +use log::error; use ruma::{ api::client::r0::sync::sync_events, events::{room::member::MembershipState, AnySyncEphemeralRoomEvent, EventType}, @@ -505,7 +506,10 @@ pub async fn sync_events_route( db.rooms .pdu_shortstatehash(&pdu.1.event_id) .ok()? - .ok_or_else(|| Error::bad_database("Pdu in db doesn't have a state hash.")) + .ok_or_else(|| { + error!("{:?}", pdu.1); + Error::bad_database("Pdu in db doesn't have a state hash.") + }) .ok() }) .and_then(|shortstatehash| { diff --git a/src/database/pusher.rs b/src/database/pusher.rs index f4b35f2..8e9b24e 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -215,7 +215,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -235,7 +235,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -266,8 +266,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -292,8 +291,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -309,7 +307,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -346,8 +344,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -372,8 +369,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) - .await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -389,7 +385,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -405,7 +401,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -421,7 +417,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -435,7 +431,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -449,7 +445,7 @@ pub async fn send_push_notice( _ => None, }) .collect::>(); - send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; + send_notice(unread, pusher, tweaks, pdu, db).await?; break; } } @@ -466,7 +462,6 @@ async fn send_notice( tweaks: Vec, event: &PduEvent, db: &Database, - name: &str, ) -> Result<()> { // TODO: email if pusher.kind == Some(PusherKind::Email) { @@ -514,7 +509,6 @@ async fn send_notice( } if event_id_only { - error!("SEND PUSH NOTICE `{}`", name); send_request( &db.globals, &url, @@ -542,7 +536,6 @@ async fn send_notice( .flatten(); notifi.room_name = room_name.as_deref(); - error!("SEND PUSH NOTICE Full `{}`", name); send_request( &db.globals, &url, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 175d4ac..e1e97b4 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -3,27 +3,24 @@ mod edus; pub use edus::RoomEdus; use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; -use log::{error, warn}; +use log::{debug, error, warn}; use regex::Regex; use ring::digest; use ruma::{ api::client::error::ErrorKind, events::{ ignored_user_list, - room::{ - member, message, - power_levels::{self, PowerLevelsEventContent}, - }, + room::{create::CreateEventContent, member, message}, EventType, }, serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw}, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, }; use sled::IVec; -use state_res::{event_auth, Event, StateMap}; +use state_res::{Event, StateMap}; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, convert::{TryFrom, TryInto}, mem, sync::Arc, @@ -227,26 +224,24 @@ impl Rooms { sender: &UserId, state_key: Option<&str>, content: serde_json::Value, - ) -> Result> { + ) -> Result>> { let auth_events = state_res::auth_types_for_event( kind, sender, state_key.map(|s| s.to_string()), - content, + content.clone(), ); let mut events = StateMap::new(); for (event_type, state_key) in auth_events { - if let Some(pdu) = self.room_state_get( - room_id, - &event_type, - &state_key - .as_deref() - .ok_or_else(|| Error::bad_database("Saved auth event with no state key."))?, - )? { - events.insert((event_type, state_key), pdu); + if let Some(pdu) = self.room_state_get(room_id, &event_type, &state_key)? { + events.insert((event_type, state_key), Arc::new(pdu)); } else { - warn!("Could not find {} {:?} in state", event_type, state_key); + // This is okay because when creating a new room some events were not created yet + debug!( + "{:?}: Could not find {} {:?} in state", + content, event_type, state_key + ); } } Ok(events) @@ -281,7 +276,7 @@ impl Rooms { pub fn force_state( &self, room_id: &RoomId, - state: HashMap<(EventType, String), EventId>, + state: BTreeMap<(EventType, String), EventId>, globals: &super::globals::Globals, ) -> Result<()> { let state_hash = self.calculate_hash( @@ -293,8 +288,10 @@ impl Rooms { let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { Some(shortstatehash) => { - warn!("state hash already existed?!"); - shortstatehash.to_vec() + // State already existed in db + self.roomid_shortstatehash + .insert(room_id.as_bytes(), &*shortstatehash)?; + return Ok(()); } None => { let shortstatehash = globals.next_count()?; @@ -483,14 +480,11 @@ impl Rooms { } /// Returns the leaf pdus of a room. - pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result> { + pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result> { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); - let mut events = Vec::new(); - - for event in self - .roomid_pduleaves + self.roomid_pduleaves .scan_prefix(prefix) .values() .map(|bytes| { @@ -501,11 +495,7 @@ impl Rooms { .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?, ) }) - { - events.push(event?); - } - - Ok(events) + .collect() } /// Replace the leaves of a room. @@ -761,6 +751,90 @@ impl Rooms { Ok(()) } + /// Generates a new StateHash and associates it with the incoming event. + /// + /// This adds all current state events (not including the incoming event) + /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. + pub fn set_event_state( + &self, + event_id: &EventId, + state: &StateMap>, + globals: &super::globals::Globals, + ) -> Result<()> { + let shorteventid = match self.eventid_shorteventid.get(event_id.as_bytes())? { + Some(shorteventid) => shorteventid.to_vec(), + None => { + let shorteventid = globals.next_count()?; + self.eventid_shorteventid + .insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; + shorteventid.to_be_bytes().to_vec() + } + }; + + let state_hash = self.calculate_hash( + &state + .values() + .map(|pdu| pdu.event_id.as_bytes()) + .collect::>(), + ); + + let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { + Some(shortstatehash) => { + // State already existed in db + self.shorteventid_shortstatehash + .insert(shorteventid, &*shortstatehash)?; + return Ok(()); + } + None => { + let shortstatehash = globals.next_count()?; + self.statehash_shortstatehash + .insert(&state_hash, &shortstatehash.to_be_bytes())?; + shortstatehash.to_be_bytes().to_vec() + } + }; + + for ((event_type, state_key), pdu) in state { + let mut statekey = event_type.as_ref().as_bytes().to_vec(); + statekey.push(0xff); + statekey.extend_from_slice(&state_key.as_bytes()); + + let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { + Some(shortstatekey) => shortstatekey.to_vec(), + None => { + let shortstatekey = globals.next_count()?; + self.statekey_shortstatekey + .insert(&statekey, &shortstatekey.to_be_bytes())?; + shortstatekey.to_be_bytes().to_vec() + } + }; + + let shorteventid = match self.eventid_shorteventid.get(pdu.event_id.as_bytes())? { + Some(shorteventid) => shorteventid.to_vec(), + None => { + let shorteventid = globals.next_count()?; + self.eventid_shorteventid + .insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?; + shorteventid.to_be_bytes().to_vec() + } + }; + + let mut state_id = shortstatehash.clone(); + state_id.extend_from_slice(&shortstatekey); + + self.stateid_shorteventid + .insert(&*state_id, &*shorteventid)?; + } + + self.shorteventid_shortstatehash + .insert(shorteventid, &*shortstatehash)?; + + Ok(()) + } + /// Generates a new StateHash and associates it with the incoming event. /// /// This adds all current state events (not including the incoming event) @@ -900,8 +974,37 @@ impl Rooms { redacts, } = pdu_builder; // TODO: Make sure this isn't called twice in parallel - let mut prev_events = self.get_pdu_leaves(&room_id)?; - prev_events.truncate(20); + let prev_events = self + .get_pdu_leaves(&room_id)? + .into_iter() + .take(20) + .collect::>(); + + let create_event = self.room_state_get(&room_id, &EventType::RoomCreate, "")?; + + let create_event_content = create_event + .as_ref() + .map(|create_event| { + Ok::<_, Error>( + serde_json::from_value::>(create_event.content.clone()) + .expect("Raw::from_value always works.") + .deserialize() + .map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?, + ) + }) + .transpose()?; + + let create_prev_event = if prev_events.len() == 1 + && Some(&prev_events[0]) == create_event.as_ref().map(|c| &c.event_id) + { + create_event.map(Arc::new) + } else { + None + }; + + let room_version = create_event_content.map_or(RoomVersionId::Version6, |create_event| { + create_event.room_version + }); let auth_events = self.get_auth_events( &room_id, @@ -911,118 +1014,6 @@ impl Rooms { content.clone(), )?; - // Is the event authorized? - if let Some(state_key) = &state_key { - let power_levels = self - .room_state_get(&room_id, &EventType::RoomPowerLevels, "")? - .map_or_else( - || { - Ok::<_, Error>(power_levels::PowerLevelsEventContent { - ban: 50.into(), - events: BTreeMap::new(), - events_default: 0.into(), - invite: 50.into(), - kick: 50.into(), - redact: 50.into(), - state_default: 0.into(), - users: BTreeMap::new(), - users_default: 0.into(), - notifications: - ruma::events::room::power_levels::NotificationPowerLevels { - room: 50.into(), - }, - }) - }, - |power_levels| { - Ok(serde_json::from_value::>( - power_levels.content, - ) - .expect("Raw::from_value always works.") - .deserialize() - .map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?) - }, - )?; - let sender_membership = self - .room_state_get(&room_id, &EventType::RoomMember, &sender.to_string())? - .map_or(Ok::<_, Error>(member::MembershipState::Leave), |pdu| { - Ok( - serde_json::from_value::>(pdu.content) - .expect("Raw::from_value always works.") - .deserialize() - .map_err(|_| Error::bad_database("Invalid Member event in db."))? - .membership, - ) - })?; - - let sender_power = power_levels.users.get(&sender).map_or_else( - || { - if sender_membership != member::MembershipState::Join { - None - } else { - Some(&power_levels.users_default) - } - }, - // If it's okay, wrap with Some(_) - Some, - ); - - // Is the event allowed? - #[allow(clippy::blocks_in_if_conditions)] - if !match event_type { - EventType::RoomEncryption => { - // Only allow encryption events if it's allowed in the config - db.globals.allow_encryption() - } - EventType::RoomMember => { - let prev_event = self - .get_pdu(prev_events.get(0).ok_or(Error::BadRequest( - ErrorKind::Unknown, - "Membership can't be the first event", - ))?)? - .map(Arc::new); - event_auth::valid_membership_change( - Some(state_key.as_str()), - &sender, - content.clone(), - prev_event, - None, // TODO: third party invite - &auth_events - .iter() - .map(|((ty, key), pdu)| { - Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone()))) - }) - .collect::>>()?, - ) - .map_err(|e| { - log::error!("{}", e); - Error::Conflict("Found incoming PDU with invalid data.") - })? - } - EventType::RoomCreate => prev_events.is_empty(), - // Not allow any of the following events if the sender is not joined. - _ if sender_membership != member::MembershipState::Join => false, - _ => { - // TODO - sender_power.unwrap_or(&power_levels.users_default) - >= &power_levels.state_default - } - } { - error!("Unauthorized {}", event_type); - // Not authorized - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized", - )); - } - } else if !self.is_joined(&sender, &room_id)? { - // TODO: auth rules apply to all events, not only those with a state key - error!("Unauthorized {}", event_type); - return Err(Error::BadRequest( - ErrorKind::Forbidden, - "Event is not authorized", - )); - } - // Our depth is the maximum depth of prev_events + 1 let depth = prev_events .iter() @@ -1057,8 +1048,8 @@ impl Rooms { .try_into() .map_err(|_| Error::bad_database("Depth is invalid"))?, auth_events: auth_events - .into_iter() - .map(|(_, pdu)| pdu.event_id) + .iter() + .map(|(_, pdu)| pdu.event_id.clone()) .collect(), redacts, unsigned, @@ -1068,6 +1059,23 @@ impl Rooms { signatures: BTreeMap::new(), }; + if !state_res::auth_check( + &room_version, + &Arc::new(pdu.clone()), + create_prev_event, + &auth_events, + None, // TODO: third_party_invite + ) + .map_err(|e| { + error!("{:?}", e); + Error::bad_database("Auth check failed.") + })? { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Event is not authorized.", + )); + } + // Hash and sign let mut pdu_json = utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); diff --git a/src/database/sending.rs b/src/database/sending.rs index 1cc2f91..b792479 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -175,8 +175,7 @@ impl Sending { // servercurrentpdus with the prefix should be empty now } } - Err((outgoing_kind, e)) => { - info!("Couldn't send transaction to {:?}\n{}", outgoing_kind, e); + Err((outgoing_kind, _)) => { let mut prefix = match &outgoing_kind { OutgoingKind::Appservice(serv) => { let mut p = b"+".to_vec(); @@ -217,7 +216,7 @@ impl Sending { let exponential_backoff = |(tries, instant): &(u32, Instant)| { // Fail if a request has failed recently (exponential backoff) - let mut min_elapsed_duration = Duration::from_secs(60) * (*tries) * (*tries); + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); if min_elapsed_duration > Duration::from_secs(60*60*24) { min_elapsed_duration = Duration::from_secs(60*60*24); } @@ -261,6 +260,8 @@ impl Sending { servercurrentpdus.insert(&key, &[]).unwrap(); servernamepduids.remove(&key).unwrap(); + last_failed_try.remove(&outgoing_kind); + futures.push( Self::handle_event( outgoing_kind, diff --git a/src/server_server.rs b/src/server_server.rs index 8babc89..e461b5a 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -22,12 +22,12 @@ use ruma::{ directory::{IncomingFilter, IncomingRoomNetwork}, events::EventType, serde::to_canonical_value, - signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, + signatures::CanonicalJsonValue, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use state_res::{Event, EventMap, StateMap}; use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{btree_map::Entry, BTreeMap, BTreeSet, HashSet}, convert::TryFrom, fmt::Debug, future::Future, @@ -180,13 +180,7 @@ where .collect::>(); if status != 200 { - info!( - "Server returned bad response {} {}\n{}\n{:?}", - destination, - status, - url, - utils::string_from_bytes(&body) - ); + info!("{} {}:\n{}", url, status, String::from_utf8_lossy(&body),); } let response = T::IncomingResponse::try_from( @@ -194,13 +188,7 @@ where .body(body) .expect("reqwest body is valid http body"), ); - response.map_err(|_| { - info!( - "Server returned invalid response bytes {}\n{}", - destination, url - ); - Error::BadServerResponse("Server returned bad response.") - }) + response.map_err(|_| Error::BadServerResponse("Server returned bad response.")) } Err(e) => Err(e.into()), } @@ -508,8 +496,6 @@ pub async fn send_transaction_message_route<'a>( return Err(Error::bad_config("Federation is disabled.")); } - info!("Incoming PDUs: {:?}", &body.pdus); - for edu in &body.edus { match serde_json::from_str::(edu.json().get()) { Ok(edu) => match edu.edu_type.as_str() { @@ -548,385 +534,19 @@ pub async fn send_transaction_message_route<'a>( let mut resolved_map = BTreeMap::new(); - let pdus_to_resolve = body - .pdus - .iter() - .filter_map(|pdu| { - // 1. Is a valid event, otherwise it is dropped. - // Ruma/PduEvent/StateEvent satisfies this - // We do not add the event_id field to the pdu here because of signature and hashes checks - let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(pdu) { - Ok(t) => t, - Err(_) => { - // Event could not be converted to canonical json - return None; - } - }; - - // If we have no idea about this room skip the PDU - let room_id = match value - .get("room_id") - .map(|id| match id { - CanonicalJsonValue::String(id) => RoomId::try_from(id.as_str()).ok(), - _ => None, - }) - .flatten() - { - Some(id) => id, - None => { - resolved_map.insert(event_id, Err("Event needs a valid RoomId".to_string())); - return None; - } - }; - - // 1. check the server is in the room (optional) - match db.rooms.exists(&room_id) { - Ok(true) => {} - _ => { - resolved_map - .insert(event_id, Err("Room is unknown to this server".to_string())); - return None; - } - } - - // If we know of this pdu we don't need to continue processing it - if let Ok(Some(_)) = db.rooms.get_pdu_id(&event_id) { - return None; - } - - Some((event_id, room_id, value)) - }) - .collect::>(); - - // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere? - // SPEC: - // Servers MUST strictly enforce the JSON format specified in the appendices. - // This translates to a 400 M_BAD_JSON error on most endpoints, or discarding of - // events over federation. For example, the Federation API's /send endpoint would - // discard the event whereas the Client Server API's /send/{eventType} endpoint - // would return a M_BAD_JSON error. - 'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve { - info!("Working on incoming pdu: {:?}", value); - let server_name = &body.body.origin; - let mut pub_key_map = BTreeMap::new(); - - // TODO: make this persist but not a DB Tree... - // This is all the auth_events that have been recursively fetched so they don't have to be - // deserialized over and over again. This could potentially also be some sort of trie (suffix tree) - // like structure so that once an auth event is known it would know (using indexes maybe) all of - // the auth events that it references. - let mut auth_cache = EventMap::new(); - - // 2. check content hash, redact if doesn't match - // 3. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events - // 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" - // 5. reject "due to auth events" if the event doesn't pass auth based on the auth events - // 7. if not timeline event: stop - // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - // the events found in step 8 can be authed/resolved and appended to the DB - let (pdu, previous_create): (Arc, Option>) = match validate_event( - &db, - value, - event_id.clone(), - &mut pub_key_map, - server_name, - // All the auth events gathered will be here - &mut auth_cache, - ) - .await - { - Ok(pdu) => pdu, - Err(e) => { - resolved_map.insert(event_id, Err(e)); - continue; - } - }; - debug!("Validated event."); - - // 6. persist the event as an outlier. - db.rooms.add_pdu_outlier(&pdu)?; - info!("Added pdu as outlier."); - - // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all - // the checks in this list starting at 1. These are not timeline events. - // - // Step 10. check the auth of the event passes based on the calculated state of the event - // - // TODO: if we know the prev_events of the incoming event we can avoid the request and build - // the state from a known point and resolve if > 1 prev_event - debug!("Requesting state at event."); - let (state_at_event, incoming_auth_events): (StateMap>, Vec>) = - match db - .sending - .send_federation_request( - &db.globals, - server_name, - get_room_state_ids::v1::Request { - room_id: pdu.room_id(), - event_id: pdu.event_id(), - }, - ) - .await - { - Ok(res) => { - debug!("Fetching state events at event."); - let state = match fetch_events( - &db, - server_name, - &mut pub_key_map, - &res.pdu_ids, - &mut auth_cache, - ) - .await - { - Ok(state) => state, - Err(_) => continue, - }; - - // Sanity check: there are no conflicting events in the state we received - let mut seen = BTreeSet::new(); - for ev in &state { - // If the key is already present - if !seen.insert((&ev.kind, &ev.state_key)) { - error!("Server sent us an invalid state"); - continue; - } - } - - let state = state - .into_iter() - .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) - .collect(); - - let incoming_auth_events = match fetch_events( - &db, - server_name, - &mut pub_key_map, - &res.auth_chain_ids, - &mut auth_cache, - ) - .await - { - Ok(state) => state, - Err(_) => continue, - }; - - debug!("Fetching auth events of state events at event."); - (state, incoming_auth_events) - } - Err(_) => { - resolved_map.insert( - pdu.event_id().clone(), - Err("Fetching state for event failed".into()), - ); - continue; - } - }; - - // 10. This is the actual auth check for state at the event - if !state_res::event_auth::auth_check( - &RoomVersionId::Version6, - &pdu, - previous_create.clone(), - &state_at_event, - None, // TODO: third party invite - ) - .map_err(|_e| Error::Conflict("Auth check failed"))? - { - // Event failed auth with state_at - resolved_map.insert( - event_id, - Err("Event has failed auth check with state at the event".into()), - ); - continue; - } - debug!("Auth check succeeded."); - // End of step 10. - - // 12. check if the event passes auth based on the "current state" of the room, if not "soft fail" it - let current_state = db - .rooms - .room_state_full(pdu.room_id())? - .into_iter() - .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) - .collect(); - - if !state_res::event_auth::auth_check( - &RoomVersionId::Version6, - &pdu, - previous_create, - ¤t_state, - None, - ) - .map_err(|_e| Error::Conflict("Auth check failed"))? - { - // Soft fail, we add the event as an outlier. - resolved_map.insert( - pdu.event_id().clone(), - Err("Event has been soft failed".into()), - ); - continue; - }; - debug!("Auth check with current state succeeded."); - - // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res - // where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote) - // - // calculate_forward_extremities takes care of adding the current state if not already in the state sets - // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. - let extremities = match calculate_forward_extremities(&db, &pdu).await { - Ok(fork_ids) => { - debug!("Calculated new forward extremities: {:?}", fork_ids); - fork_ids - } + for pdu in body.pdus.iter() { + // We do not add the event_id field to the pdu here because of signature and hashes checks + let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(pdu) { + Ok(t) => t, Err(_) => { - resolved_map.insert(event_id, Err("Failed to gather forward extremities".into())); + // Event could not be converted to canonical json continue; } }; - // This will create the state after any state snapshot it builds - // So current_state will have the incoming event inserted to it - let mut fork_states = - match build_forward_extremity_snapshots(&db, pdu.clone(), current_state, &extremities) - .await - { - Ok(states) => states, - Err(_) => { - resolved_map - .insert(event_id, Err("Failed to gather forward extremities".into())); - continue; - } - }; - - // Make this the state after. - let mut state_after = state_at_event.clone(); - state_after.insert((pdu.kind(), pdu.state_key()), pdu.clone()); - // Add the incoming event to the mix of state snapshots - // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets - fork_states.insert(state_after.clone()); - - let fork_states = fork_states.into_iter().collect::>(); - - let mut update_state = false; - // 13. start state-res with all previous forward extremities minus the ones that are in - // the prev_events of this event plus the new one created by this event and use - // the result as the new room state - let state_at_forks = if fork_states.is_empty() { - // State is empty - Default::default() - } else if fork_states.len() == 1 { - fork_states[0].clone() - } else { - // We do need to force an update to this rooms state - update_state = true; - - let mut auth_events = vec![]; - for map in &fork_states { - let mut state_auth = vec![]; - for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { - match fetch_events( - &db, - server_name, - &mut pub_key_map, - &[auth_id.clone()], - &mut auth_cache, - ) - .await - { - // This should always contain exactly one element when Ok - Ok(events) => state_auth.push(events[0].clone()), - Err(e) => { - debug!("Event was not present: {}", e); - } - } - } - auth_events.push(state_auth); - } - - // Add everything we will need to event_map - auth_cache.extend( - auth_events - .iter() - .map(|pdus| pdus.iter().map(|pdu| (pdu.event_id().clone(), pdu.clone()))) - .flatten(), - ); - auth_cache.extend( - incoming_auth_events - .into_iter() - .map(|pdu| (pdu.event_id().clone(), pdu)), - ); - auth_cache.extend( - state_after - .into_iter() - .map(|(_, pdu)| (pdu.event_id().clone(), pdu)), - ); - - let res = match state_res::StateResolution::resolve( - pdu.room_id(), - &RoomVersionId::Version6, - &fork_states - .into_iter() - .map(|map| { - map.into_iter() - .map(|(k, v)| (k, v.event_id.clone())) - .collect::>() - }) - .collect::>(), - auth_events - .into_iter() - .map(|pdus| pdus.into_iter().map(|pdu| pdu.event_id().clone()).collect()) - .collect(), - &mut auth_cache, - ) { - Ok(res) => res, - Err(_) => { - resolved_map.insert( - pdu.event_id().clone(), - Err("State resolution failed, either an event could not be found or deserialization".into()), - ); - continue 'main_pdu_loop; - } - }; - - let mut resolved = BTreeMap::new(); - for (k, id) in res { - // We should know of the event but just incase - let pdu = match auth_cache.get(&id) { - Some(pdu) => pdu.clone(), - None => { - error!("Event was not present in auth_cache {}", id); - resolved_map.insert( - event_id.clone(), - Err("Event was not present in auth cache".into()), - ); - continue 'main_pdu_loop; - } - }; - resolved.insert(k, pdu); - } - resolved - }; - - // Now that the event has passed all auth it is added into the timeline. - // We use the `state_at_event` instead of `state_after` so we accurately - // represent the state for this event. - append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; - info!("Appended incoming pdu."); - - // Set the new room state to the resolved state - update_resolved_state( - &db, - pdu.room_id(), - if update_state { - Some(state_at_forks) - } else { - None - }, - )?; - debug!("Updated resolved state"); - - // Event has passed all auth/stateres checks + if let Err(e) = handle_incoming_pdu(&body.origin, &event_id, value, true, &db).await { + resolved_map.insert(event_id, Err(e)); + } } if !resolved_map.is_empty() { @@ -939,26 +559,80 @@ pub async fn send_transaction_message_route<'a>( /// An async function that can recursively calls itself. type AsyncRecursiveResult<'a, T> = Pin> + 'a + Send>>; -/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events -/// Validate any event that is given to us by another server. -/// -/// 1. Is a valid event, otherwise it is dropped (PduEvent deserialization satisfies this). -/// 2. check content hash, redact if doesn't match -/// 3. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events -/// 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" -/// 5. reject "due to auth events" if the event doesn't pass auth based on the auth events -/// 7. if not timeline event: stop -/// 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events -#[tracing::instrument(skip(db, pub_key_map, auth_cache))] -fn validate_event<'a>( - db: &'a Database, - value: CanonicalJsonObject, - event_id: EventId, - pub_key_map: &'a mut PublicKeyMap, +/// When receiving an event one needs to: +/// 0. Skip the PDU if we already know about it +/// 1. Check the server is in the room +/// 2. Check signatures, otherwise drop +/// 3. Check content hash, redact if doesn't match +/// 4. Fetch any missing auth events doing all checks listed here starting at 1. These are not +/// timeline events +/// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are +/// also rejected "due to auth events" +/// 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events +/// 7. Persist this event as an outlier +/// 8. If not timeline event: stop +/// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline +/// events +/// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities +/// doing all the checks in this list starting at 1. These are not timeline events +/// 11. Check the auth of the event passes based on the state of the event +/// 12. Ensure that the state is derived from the previous current state (i.e. we calculated by +/// doing state res where one of the inputs was a previously trusted set of state, don't just +/// trust a set of state we got from a remote) +/// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" +/// it +/// 14. Use state resolution to find new room state +// We use some AsyncRecursiveResult hacks here so we can call this async funtion recursively +fn handle_incoming_pdu<'a>( origin: &'a ServerName, - auth_cache: &'a mut EventMap>, -) -> AsyncRecursiveResult<'a, (Arc, Option>)> { + event_id: &'a EventId, + value: BTreeMap, + is_timeline_event: bool, + db: &'a Database, +) -> AsyncRecursiveResult<'a, Arc> { Box::pin(async move { + // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json + + // 0. Skip the PDU if we already know about it + if let Ok(Some(pdu)) = db.rooms.get_pdu(&event_id) { + return Ok(Arc::new(pdu)); + } + + // 1. Check the server is in the room + let room_id = match value + .get("room_id") + .map(|id| match id { + CanonicalJsonValue::String(id) => RoomId::try_from(id.as_str()).ok(), + _ => None, + }) + .flatten() + { + Some(id) => id, + None => { + // Event is invalid + return Err("Event needs a valid RoomId".to_string()); + } + }; + + match db.rooms.exists(&room_id) { + Ok(true) => {} + _ => { + return Err("Room is unknown to this server".to_string()); + } + } + + let mut pub_key_map = BTreeMap::new(); + + // This is all the auth_events that have been recursively fetched so they don't have to be + // deserialized over and over again. + // TODO: make this persist across requests but not in a DB Tree (in globals?) + // TODO: This could potentially also be some sort of trie (suffix tree) like structure so + // that once an auth event is known it would know (using indexes maybe) all of the auth + // events that it references. + let mut auth_cache = EventMap::new(); + + // We go through all the signatures we see on the value and fetch the corresponding signing + // keys for (signature_server, signature) in match value .get("signatures") .ok_or_else(|| "No signatures in server response pdu.".to_string())? @@ -998,132 +672,443 @@ fn validate_event<'a>( pub_key_map.insert(signature_server.clone(), keys); } + // 2. Check signatures, otherwise drop + // 3. check content hash, redact if doesn't match let mut val = match ruma::signatures::verify_event(&pub_key_map, &value, &RoomVersionId::Version5) { - Ok(ver) => { - if let ruma::signatures::Verified::Signatures = ver { - match ruma::signatures::redact(&value, &RoomVersionId::Version6) { - Ok(obj) => obj, - Err(_) => return Err("Redaction failed".to_string()), - } - } else { - value - } - } Err(e) => { + // Drop error!("{:?}: {}", value, e); return Err("Signature verification failed".to_string()); } + Ok(ruma::signatures::Verified::Signatures) => { + // Redact + match ruma::signatures::redact(&value, &RoomVersionId::Version6) { + Ok(obj) => obj, + Err(_) => return Err("Redaction failed".to_string()), + } + } + Ok(ruma::signatures::Verified::All) => value, }; // Now that we have checked the signature and hashes we can add the eventID and convert - // to our PduEvent type also finally verifying the first step listed above + // to our PduEvent type val.insert( "event_id".to_owned(), to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"), ); - let pdu = serde_json::from_value::( + let incoming_pdu = serde_json::from_value::( serde_json::to_value(val).expect("CanonicalJsonObj is a valid JsonValue"), ) - .map_err(|_| "Event is not a valid PDU".to_string())?; + .map_err(|_| "Event is not a valid PDU.".to_string())?; + // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events + // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" debug!("Fetching auth events."); - fetch_check_auth_events(db, origin, pub_key_map, &pdu.auth_events, auth_cache) + fetch_and_handle_events(db, origin, &incoming_pdu.auth_events, &mut auth_cache) .await .map_err(|e| e.to_string())?; - let pdu = Arc::new(pdu.clone()); + // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events + debug!("Checking auth."); - /* - // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - debug!("Fetching prev events."); - let previous = fetch_events(&db, origin, pub_key_map, &pdu.prev_events, auth_cache) - .await - .map_err(|e| e.to_string())?; - */ + // Build map of auth events + let mut auth_events = BTreeMap::new(); + for id in incoming_pdu.auth_events.iter() { + let auth_event = auth_cache.get(id).ok_or_else(|| { + "Auth event not found, event failed recursive auth checks.".to_string() + })?; - // if the previous event was the create event special rules apply - let previous_create = if pdu.auth_events.len() == 1 && pdu.prev_events == pdu.auth_events { - auth_cache.get(&pdu.auth_events[0]).cloned() + match auth_events.entry(( + auth_event.kind.clone(), + auth_event + .state_key + .clone() + .expect("all auth events have state keys"), + )) { + Entry::Vacant(v) => { + v.insert(auth_event.clone()); + } + Entry::Occupied(_) => { + return Err( + "Auth event's type and state_key combination exists multiple times." + .to_owned(), + ) + } + } + } + + let create_event = db + .rooms + .room_state_get(&incoming_pdu.room_id, &EventType::RoomCreate, "") + .map_err(|_| "Failed to ask database for event.")? + .ok_or_else(|| "Failed to find create event in db.")?; + + // The original create event must be in the auth events + if auth_events + .get(&(EventType::RoomCreate, "".to_owned())) + .map(|a| a.as_ref()) + != Some(&create_event) + { + return Err("Incoming event refers to wrong create event.".to_owned()); + } + + // If the previous event was the create event special rules apply + let previous_create = if incoming_pdu.auth_events.len() == 1 + && incoming_pdu.prev_events == incoming_pdu.auth_events + { + auth_cache + .get(&incoming_pdu.auth_events[0]) + .cloned() + .filter(|maybe_create| **maybe_create == create_event) } else { None }; - // Check that the event passes auth based on the auth_events - debug!("Checking auth."); - let is_authed = state_res::event_auth::auth_check( + let incoming_pdu = Arc::new(incoming_pdu.clone()); + + if !state_res::event_auth::auth_check( &RoomVersionId::Version6, - &pdu, + &incoming_pdu, previous_create.clone(), - &pdu.auth_events - .iter() - .map(|id| { - auth_cache - .get(id) - .map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone())) - .ok_or_else(|| { - "Auth event not found, event failed recursive auth checks.".to_string() - }) - }) - .collect::, _>>()?, + &auth_events, None, // TODO: third party invite ) - .map_err(|_e| "Auth check failed".to_string())?; - - if !is_authed { - return Err("Event has failed auth check with auth events".to_string()); + .map_err(|_e| "Auth check failed".to_string())? + { + return Err("Event has failed auth check with auth events.".to_string()); } debug!("Validation successful."); - Ok((pdu, previous_create)) - }) -} -#[tracing::instrument(skip(db, key_map, auth_cache))] -async fn fetch_check_auth_events( - db: &Database, - origin: &ServerName, - key_map: &mut PublicKeyMap, - event_ids: &[EventId], - auth_cache: &mut EventMap>, -) -> Result<()> { - fetch_events(db, origin, key_map, event_ids, auth_cache).await?; - Ok(()) + // 7. Persist the event as an outlier. + db.rooms + .add_pdu_outlier(&incoming_pdu) + .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; + debug!("Added pdu as outlier."); + + // 8. if not timeline event: stop + if !is_timeline_event { + return Ok(incoming_pdu); + } + + // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events + + // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities + // doing all the checks in this list starting at 1. These are not timeline events. + + // TODO: if we know the prev_events of the incoming event we can avoid the request and build + // the state from a known point and resolve if > 1 prev_event + + debug!("Requesting state at event."); + let (state_at_incoming_event, incoming_auth_events): (StateMap>, Vec>) = + // Call /state_ids to find out what the state at this pdu is. We trust the server's + // response to some extend, but we still do a lot of checks on the events + match db + .sending + .send_federation_request( + &db.globals, + origin, + get_room_state_ids::v1::Request { + room_id: &incoming_pdu.room_id, + event_id: &incoming_pdu.event_id, + }, + ) + .await + { + Ok(res) => { + debug!("Fetching state events at event."); + let state_vec = match fetch_and_handle_events( + &db, + origin, + &res.pdu_ids, + &mut auth_cache, + ) + .await + { + Ok(state) => state, + Err(_) => return Err("Failed to fetch state events.".to_owned()), + }; + + let mut state = BTreeMap::new(); + for pdu in state_vec.into_iter() { + match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { + Entry::Vacant(v) => { + v.insert(pdu); + } + Entry::Occupied(_) => { + return Err( + "State event's type and state_key combination exists multiple times.".to_owned(), + ) + } + } + } + + // The original create event must still be in the state + if state.get(&(EventType::RoomCreate, "".to_owned())).map(|a| a.as_ref()) != Some(&create_event) { + return Err("Incoming event refers to wrong create event.".to_owned()); + } + + debug!("Fetching auth chain events at event."); + let incoming_auth_events = match fetch_and_handle_events( + &db, + origin, + &res.auth_chain_ids, + &mut auth_cache, + ) + .await + { + Ok(state) => state, + Err(_) => return Err("Failed to fetch auth chain.".to_owned()), + }; + + (state, incoming_auth_events) + } + Err(_) => { + return Err("Fetching state for event failed".into()); + } + }; + + // 11. Check the auth of the event passes based on the state of the event + if !state_res::event_auth::auth_check( + &RoomVersionId::Version6, + &incoming_pdu, + previous_create.clone(), + &state_at_incoming_event, + None, // TODO: third party invite + ) + .map_err(|_e| "Auth check failed.".to_owned())? + { + return Err("Event has failed auth check with state at the event.".into()); + } + debug!("Auth check succeeded."); + + // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it + let current_state = db + .rooms + .room_state_full(incoming_pdu.room_id()) + .map_err(|_| "Failed to load room state.".to_owned())? + .into_iter() + .map(|(k, v)| (k, Arc::new(v))) + .collect(); + + if !state_res::event_auth::auth_check( + &RoomVersionId::Version6, // TODO: Use correct room version + &incoming_pdu, + previous_create, + ¤t_state, + None, + ) + .map_err(|_e| "Auth check failed.".to_owned())? + { + // Soft fail, we leave the event as an outlier but don't add it to the timeline + return Err("Event has been soft failed".into()); + }; + debug!("Auth check with current state succeeded."); + + // Now we calculate the set of extremities this room has after the incoming event has been + // applied. We start with the previous extremities (aka leaves) + let mut extremities = db + .rooms + .get_pdu_leaves(&incoming_pdu.room_id) + .map_err(|_| "Failed to load room leaves".to_owned())?; + + // Remove any forward extremities that are referenced by this incoming event's prev_events + for prev_event in &incoming_pdu.prev_events { + if extremities.contains(prev_event) { + extremities.remove(prev_event); + } + } + + let mut fork_states = BTreeSet::new(); + for id in &extremities { + match db.rooms.get_pdu(&id).map_err(|_| "Failed to ask db for pdu.".to_owned())? { + Some(leaf_pdu) => { + let pdu_shortstatehash = db + .rooms + .pdu_shortstatehash(&leaf_pdu.event_id) + .map_err(|_| "Failed to ask db for pdu state hash.".to_owned())? + .ok_or_else(|| { + error!( + "Found extremity pdu with no statehash in db: {:?}", + leaf_pdu + ); + "Found pdu with no statehash in db.".to_owned() + })?; + + let mut leaf_state = db + .rooms + .state_full(pdu_shortstatehash) + .map_err(|_| "Failed to ask db for room state.".to_owned())? + .into_iter() + .map(|(k, v)| (k, Arc::new(v))) + .collect::>(); + + if let Some(state_key) = &leaf_pdu.state_key { + // Now it's the state after + let key = (leaf_pdu.kind.clone(), state_key.clone()); + leaf_state.insert(key, Arc::new(leaf_pdu)); + } + + fork_states.insert(leaf_state); + } + _ => { + error!("Missing state snapshot for {:?}", id); + return Err("Missing state snapshot.".to_owned()); + } + } + } + + // 12. Ensure that the state is derived from the previous current state (i.e. we calculated + // by doing state res where one of the inputs was a previously trusted set of state, + // don't just trust a set of state we got from a remote). + + // We do this by adding the current state to the list of fork states + fork_states.insert(current_state); + + // We also add state after incoming event to the fork states + let mut state_after = state_at_incoming_event.clone(); + if let Some(state_key) = &incoming_pdu.state_key { + state_after.insert( + (incoming_pdu.kind.clone(), state_key.clone()), + incoming_pdu.clone(), + ); + } + fork_states.insert(state_after.clone()); + + let fork_states = fork_states.into_iter().collect::>(); + + let mut update_state = false; + // 14. Use state resolution to find new room state + let new_room_state = if fork_states.is_empty() { + return Err("State is empty.".to_owned()); + } else if fork_states.len() == 1 { + // There was only one state, so it has to be the room's current state (because that is + // always included) + info!("Skipping stateres because there is no new state."); + fork_states[0] + .iter() + .map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) + .collect() + } else { + // We do need to force an update to this room's state + update_state = true; + + let mut auth_events = vec![]; + for map in &fork_states { + let mut state_auth = vec![]; + for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { + match fetch_and_handle_events(&db, origin, &[auth_id.clone()], &mut auth_cache) + .await + { + // This should always contain exactly one element when Ok + Ok(events) => state_auth.push(events[0].clone()), + Err(e) => { + debug!("Event was not present: {}", e); + } + } + } + auth_events.push(state_auth); + } + + // Add everything we will need to event_map + auth_cache.extend( + auth_events + .iter() + .map(|pdus| pdus.iter().map(|pdu| (pdu.event_id().clone(), pdu.clone()))) + .flatten(), + ); + auth_cache.extend( + incoming_auth_events + .into_iter() + .map(|pdu| (pdu.event_id().clone(), pdu)), + ); + auth_cache.extend( + state_after + .into_iter() + .map(|(_, pdu)| (pdu.event_id().clone(), pdu)), + ); + + match state_res::StateResolution::resolve( + &incoming_pdu.room_id, + &RoomVersionId::Version6, + &fork_states + .into_iter() + .map(|map| { + map.into_iter() + .map(|(k, v)| (k, v.event_id.clone())) + .collect::>() + }) + .collect::>(), + auth_events + .into_iter() + .map(|pdus| pdus.into_iter().map(|pdu| pdu.event_id().clone()).collect()) + .collect(), + &mut auth_cache, + ) { + Ok(new_state) => new_state, + Err(_) => { + return Err("State resolution failed, either an event could not be found or deserialization".into()); + } + } + }; + + // Now that the event has passed all auth it is added into the timeline. + // We use the `state_at_event` instead of `state_after` so we accurately + // represent the state for this event. + append_incoming_pdu(&db, &incoming_pdu, extremities, &state_at_incoming_event) + .map_err(|_| "Failed to add pdu to db.".to_owned())?; + debug!("Appended incoming pdu."); + + // Set the new room state to the resolved state + if update_state { + db.rooms + .force_state(&room_id, new_room_state, &db.globals) + .map_err(|_| "Failed to set new room state.".to_owned())?; + } + debug!("Updated resolved state"); + + // Event has passed all auth/stateres checks + Ok(incoming_pdu) + }) } /// Find the event and auth it. Once the event is validated (steps 1 - 8) /// it is appended to the outliers Tree. /// -/// 0. Look in the auth_cache -/// 1. Look in the main timeline (pduid_pdu tree) -/// 2. Look at outlier pdu tree -/// 3. Ask origin server over federation -/// 4. TODO: Ask other servers over federation? +/// a. Look in the auth_cache +/// b. Look in the main timeline (pduid_pdu tree) +/// c. Look at outlier pdu tree +/// d. Ask origin server over federation +/// e. TODO: Ask other servers over federation? /// /// If the event is unknown to the `auth_cache` it is added. This guarantees that any /// event we need to know of will be present. //#[tracing::instrument(skip(db, key_map, auth_cache))] -pub(crate) async fn fetch_events( +pub(crate) async fn fetch_and_handle_events( db: &Database, origin: &ServerName, - key_map: &mut PublicKeyMap, events: &[EventId], auth_cache: &mut EventMap>, ) -> Result>> { let mut pdus = vec![]; for id in events { + // a. Look at auth cache let pdu = match auth_cache.get(id) { Some(pdu) => { debug!("Event found in cache"); pdu.clone() } - // `get_pdu` checks the outliers tree for us + // b. Look in the main timeline (pduid_pdu tree) + // c. Look at outlier pdu tree + // (get_pdu checks both) None => match db.rooms.get_pdu(&id)? { Some(pdu) => { debug!("Event found in outliers"); Arc::new(pdu) } None => { + // d. Ask origin server over federation debug!("Fetching event over federation: {:?}", id); match db .sending @@ -1138,16 +1123,13 @@ pub(crate) async fn fetch_events( debug!("Got event over federation: {:?}", res); let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu)?; - let (pdu, _) = - validate_event(db, value, event_id, key_map, origin, auth_cache) - .await - .map_err(|e| { - error!("ERROR: {:?}", e); - Error::Conflict("Authentication of event failed") - })?; + let pdu = handle_incoming_pdu(origin, &event_id, value, false, db) + .await + .map_err(|e| { + error!("Error: {:?}", e); + Error::Conflict("Authentication of event failed") + })?; - debug!("Added fetched pdu as outlier."); - db.rooms.add_pdu_outlier(&pdu)?; pdu } Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), @@ -1253,180 +1235,15 @@ pub(crate) async fn fetch_signing_keys( )) } -/// Gather all state snapshots needed to resolve the current state of the room. -/// -/// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res -/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote). -/// -/// The state snapshot of the incoming event __needs__ to be added to the resulting list. -#[tracing::instrument(skip(db))] -pub(crate) async fn calculate_forward_extremities( - db: &Database, - pdu: &PduEvent, -) -> Result> { - let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; - - let mut is_incoming_leaf = true; - // Make sure the incoming event is not already a forward extremity - // FIXME: I think this could happen if different servers send us the same event?? - if current_leaves.contains(pdu.event_id()) { - error!("The incoming event is already present in get_pdu_leaves BUG"); - is_incoming_leaf = false; - // Not sure what to do here - } - - // If the incoming event is already referenced by an existing event - // then do nothing - it's not a candidate to be a new extremity if - // it has been referenced. - if db.rooms.is_pdu_referenced(pdu)? { - is_incoming_leaf = false; - } - - // TODO: - // [dendrite] Checks if any other leaves have been referenced and removes them - // but as long as we update the pdu leaves here and for events on our server this - // should not be possible. - - // Remove any forward extremities that are referenced by this incoming events prev_events - for incoming_leaf in &pdu.prev_events { - if current_leaves.contains(incoming_leaf) { - if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) { - current_leaves.remove(pos); - } - } - } - - // Add the incoming event only if it is a leaf, we do this after fetching all the - // state since we know we have already fetched the state of the incoming event so lets - // not do it again! - if is_incoming_leaf { - current_leaves.push(pdu.event_id().clone()); - } - - Ok(current_leaves) -} - -/// This should always be called after the incoming event has been appended to the DB. -/// -/// This guarantees that the incoming event will be in the state sets (at least our servers -/// and the sending server). -pub(crate) async fn build_forward_extremity_snapshots( - db: &Database, - pdu: Arc, - mut current_state: StateMap>, - current_leaves: &[EventId], -) -> Result>>> { - let current_shortstatehash = db.rooms.current_shortstatehash(pdu.room_id())?; - - let mut includes_current_state = false; - let mut fork_states = BTreeSet::new(); - for id in current_leaves { - if id == &pdu.event_id { - continue; - } - match db.rooms.get_pdu(id)? { - // We can skip this because it is handled outside of this function - // The current server state and incoming event state are built to be - // the state after. - // This would be the incoming state from the server. - Some(leaf_pdu) => { - let pdu_shortstatehash = db - .rooms - .pdu_shortstatehash(&leaf_pdu.event_id)? - .ok_or_else(|| { - warn!("Leaf pdu: {:?}", leaf_pdu); - Error::bad_database("Found pdu with no statehash in db.") - })?; - - if current_shortstatehash.as_ref() == Some(&pdu_shortstatehash) { - includes_current_state = true; - } - - let mut state_before = db - .rooms - .state_full(pdu_shortstatehash)? - .into_iter() - .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) - .collect::>(); - - // Now it's the state after - let key = (leaf_pdu.kind.clone(), leaf_pdu.state_key.clone()); - state_before.insert(key, Arc::new(leaf_pdu)); - - fork_states.insert(state_before); - } - _ => { - error!("Missing state snapshot for {:?}", id); - return Err(Error::bad_database("Missing state snapshot.")); - } - } - } - - // This guarantees that our current room state is included - if !includes_current_state { - current_state.insert((pdu.kind(), pdu.state_key()), pdu); - - fork_states.insert(current_state); - } - - Ok(fork_states) -} - -#[tracing::instrument(skip(db))] -pub(crate) fn update_resolved_state( - db: &Database, - room_id: &RoomId, - state: Option>>, -) -> Result<()> { - // Update the state of the room if needed - // We can tell if we need to do this based on wether state resolution took place or not - if let Some(state) = state { - let mut new_state = HashMap::new(); - for ((ev_type, state_k), pdu) in state { - new_state.insert( - ( - ev_type, - state_k.ok_or(Error::Conflict( - "update_resolved_state: State contained non state event", - ))?, - ), - pdu.event_id.clone(), - ); - } - - db.rooms.force_state(room_id, new_state, &db.globals)?; - } - - Ok(()) -} - /// Append the incoming event setting the state snapshot to the state from the /// server that sent the event. #[tracing::instrument(skip(db))] pub(crate) fn append_incoming_pdu( db: &Database, pdu: &PduEvent, - new_room_leaves: &[EventId], + new_room_leaves: HashSet, state: &StateMap>, ) -> Result<()> { - // Update the state of the room if needed - // We can tell if we need to do this based on wether state resolution took place or not - let mut new_state = HashMap::new(); - for ((ev_type, state_k), state_pdu) in state { - new_state.insert( - ( - ev_type.clone(), - state_k.clone().ok_or(Error::Conflict( - "append_incoming_pdu: State contained non state event", - ))?, - ), - state_pdu.event_id.clone(), - ); - } - - db.rooms - .force_state(pdu.room_id(), new_state, &db.globals)?; - let count = db.globals.next_count()?; let mut pdu_id = pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); @@ -1434,19 +1251,18 @@ pub(crate) fn append_incoming_pdu( // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. - let state_hash = db.rooms.append_to_state(&pdu, &db.globals)?; + db.rooms + .set_event_state(&pdu.event_id, state, &db.globals)?; db.rooms.append_pdu( pdu, utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), count, pdu_id.clone().into(), - &new_room_leaves, + &new_room_leaves.into_iter().collect::>(), &db, )?; - db.rooms.set_room_state(pdu.room_id(), state_hash)?; - for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces