From db8a0c5d69ada292879a8356e10eb96c6f961c8f Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Sun, 8 Nov 2020 14:44:02 -0500 Subject: [PATCH] Add closest_parent method to Rooms Db insert in order /send pdus --- Cargo.lock | 1 + Cargo.toml | 8 +- src/database/rooms.rs | 57 +++++++- src/server_server.rs | 296 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 334 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 461972b..3f8703e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2258,6 +2258,7 @@ checksum = "7345c971d1ef21ffdbd103a75990a15eb03604fc8b8852ca8cb418ee1a099028" [[package]] name = "state-res" version = "0.1.0" +source = "git+https://github.com/ruma/state-res?branch=timo-spec-comp#1cd1a16cdefabb126a781a50b3d5eb1fdb3d3afb" dependencies = [ "itertools", "js_int", diff --git a/Cargo.toml b/Cargo.toml index a6f89b9..b72b92f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client- # Used when doing state resolution # state-res = { git = "https://github.com/timokoesters/state-res", branch = "spec-comp", features = ["unstable-pre-spec"] } -state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } +state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec", "gen-eventid"] } # state-res = { path = "../../state-res", features = ["unstable-pre-spec"] } # Used for long polling @@ -78,6 +78,6 @@ path = "src/lib.rs" # ruma = { path = "../ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"] } # # ruma = { git = "https://github.com/ruma/ruma", rev = "64b9c646d15a359d62ab464a95176ff94adb2554", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"]} -[patch."https://github.com/ruma/state-res"] -state-res = { path = "../../state-res", features = ["unstable-pre-spec"] } -# state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp" } \ No newline at end of file +# [patch."https://github.com/ruma/state-res"] +# state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } +# # state-res = { git = "https://github.com/ruma/state-res", branch = "timo-spec-comp" } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index d9238a9..28e1f60 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -35,6 +35,11 @@ use super::admin::AdminCommand; /// hashing the entire state. pub type StateHashId = IVec; +pub enum ClosestParent { + Append, + Insert(u64), +} + #[derive(Clone)] pub struct Rooms { pub edus: edus::RoomEdus, @@ -74,7 +79,10 @@ impl StateStore for Rooms { .get_pdu_id(event_id) .map_err(StateError::custom)? .ok_or_else(|| { - StateError::NotFound("PDU via room_id and event_id not found in the db.".into()) + StateError::NotFound(format!( + "PDU via room_id and event_id not found in the db.\n{}", + event_id.as_str() + )) })?; serde_json::from_slice( @@ -395,6 +403,47 @@ impl Rooms { } } + pub fn get_closest_parent( + &self, + incoming_prev_ids: &[EventId], + their_state: &BTreeMap>, + ) -> Result> { + match self.pduid_pdu.last()? { + Some(val) + if incoming_prev_ids.contains( + &serde_json::from_slice::(&val.1) + .map_err(|_| { + Error::bad_database("last DB entry contains invalid PDU bytes") + })? + .event_id, + ) => + { + Ok(Some(ClosestParent::Append)) + } + _ => { + let mut prev_ids = incoming_prev_ids.to_vec(); + while let Some(id) = prev_ids.pop() { + match self.get_pdu_id(&id)? { + Some(pdu_id) => { + return Ok(Some(ClosestParent::Insert(self.pdu_count(&pdu_id)?))); + } + None => { + prev_ids.extend(their_state.get(&id).map_or( + Err(Error::BadServerResponse( + "Failed to find previous event for PDU in state", + )), + // `prev_event_ids` will return an empty Vec instead of failing + // so it works perfect for our use here + |pdu| Ok(pdu.prev_event_ids()), + )?); + } + } + } + Ok(None) + } + } + } + /// Returns the leaf pdus of a room. pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result> { let mut prefix = room_id.as_bytes().to_vec(); @@ -438,6 +487,9 @@ impl Rooms { #[allow(clippy::too_many_arguments)] /// Creates a new persisted data unit and adds it to a room. + /// + /// By this point the incoming event should be fully authenticated, no auth happens + /// in `append_pdu`. pub fn append_pdu( &self, pdu: &PduEvent, @@ -554,6 +606,7 @@ impl Rooms { self.stateid_pduid .scan_prefix(&prefix) .filter_map(|pdu| pdu.map_err(|e| error!("{}", e)).ok()) + // Chop the old state_hash out leaving behind the (EventType, StateKey) .map(|(k, v)| (k.subslice(prefix.len(), k.len() - prefix.len()), v)) .collect::>() } else { @@ -851,7 +904,7 @@ impl Rooms { .filter_map(|r| r.ok()) .filter(|server| &**server != globals.server_name()) { - sending.send_pdu(server, &pdu_id)?; + sending.send_pdu(&server, &pdu_id)?; } Ok(pdu.event_id) diff --git a/src/server_server.rs b/src/server_server.rs index 5b70780..41520fb 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,4 +1,7 @@ -use crate::{client_server, ConduitResult, Database, Error, PduEvent, Result, Ruma}; +use crate::{ + client_server, database::rooms::ClosestParent, utils, ConduitResult, Database, Error, PduEvent, + Result, Ruma, +}; use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; use log::warn; @@ -381,44 +384,266 @@ pub async fn send_transaction_message_route<'a>( } //dbg!(&*body); + // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we? + // SPEC: + // Servers MUST strictly enforce the JSON format specified in the appendices. + // This translates to a 400 M_BAD_JSON error on most endpoints, or discarding of + // events over federation. For example, the Federation API's /send endpoint would + // discard the event whereas the Client Server API's /send/{eventType} endpoint + // would return a M_BAD_JSON error. + let mut resolved_map = BTreeMap::new(); for pdu in &body.pdus { - let mut value = serde_json::from_str(pdu.json().get()) - .expect("converting raw jsons to values always works"); - - let event_id = EventId::try_from(&*format!( - "${}", - ruma::signatures::reference_hash(&value).expect("ruma can calculate reference hashes") - )) - .expect("ruma's reference hashes are valid event ids"); - - value - .as_object_mut() - .expect("ruma pdus are json objects") - .insert("event_id".to_owned(), event_id.to_string().into()); - + println!("LOOP"); + let (event_id, value) = process_incoming_pdu(pdu); let pdu = serde_json::from_value::(value.clone()) .expect("all ruma pdus are conduit pdus"); - if db.rooms.exists(&pdu.room_id)? { + let room_id = &pdu.room_id; + + if value.get("state_key").is_none() { + if !db.rooms.is_joined(&pdu.sender, &pdu.room_id)? { + // TODO: auth rules apply to all events, not only those with a state key + log::error!("Unauthorized {}", pdu.kind); + return Err(Error::BadRequest( + ruma::api::client::error::ErrorKind::Forbidden, + "Event is not authorized", + )); + } + + // TODO: We should be doing the same get_closest_parent thing here too? + // same as for state events ~100 lines down let count = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); - db.rooms.append_to_state(&pdu_id, &pdu)?; db.rooms.append_pdu( &pdu, &value, count, - pdu_id.clone().into(), + pdu_id.into(), &db.globals, &db.account_data, &db.admin, )?; + + resolved_map.insert(event_id, Ok::<(), String>(())); + continue; } + + let now = std::time::Instant::now(); + let get_state_response = match send_request( + &db.globals, + body.body.origin.clone(), + ruma::api::federation::event::get_room_state::v1::Request { + room_id, + event_id: &event_id, + }, + ) + .await + { + Ok(res) => res, + // We can't hard fail because there are some valid errors, just + // keep checking PDU's + // + // As an example a possible error + // {"errcode":"M_FORBIDDEN","error":"Host not in room."} + Err(err) => { + log::error!("Request failed: {}", err); + resolved_map.insert(event_id, Err(err.to_string())); + dbg!(now.elapsed()); + continue; + } + }; + dbg!(now.elapsed()); + + let their_current_state = get_state_response + .pdus + .iter() + .chain(get_state_response.auth_chain.iter()) // add auth events + .map(|pdu| { + let (event_id, json) = process_incoming_pdu(pdu); + ( + event_id.clone(), + std::sync::Arc::new( + // When creating a StateEvent the event_id arg will be used + // over any found in the json and it will not use ruma::reference_hash + // to generate one + state_res::StateEvent::from_id_value(event_id, json) + .expect("valid pdu json"), + ), + ) + }) + .collect::>(); + + if value.get("state_key").is_none() { + if !db.rooms.is_joined(&pdu.sender, &pdu.room_id)? { + // TODO: auth rules apply to all events, not only those with a state key + log::error!("Sender is not joined {}", pdu.kind); + + resolved_map.insert(event_id, Err("User is not in this room".into())); + continue; + } + + // // TODO: We should be doing the same get_closest_parent thing here too? + // // same as for state events ~100 lines down + // let count = db.globals.next_count()?; + // let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + // pdu_id.push(0xff); + // pdu_id.extend_from_slice(&count.to_be_bytes()); + // db.rooms.append_pdu( + // &pdu, + // &value, + // count, + // pdu_id.into(), + // &db.globals, + // &db.account_data, + // &db.sending, + // )?; + + // If the event is older than the last event in pduid_pdu Tree then find the + // closest ancestor we know of and insert after the known ancestor by + // altering the known events pduid to = same roomID + same count bytes + 0x1 + // pushing a single byte every time a simple append cannot be done. + match db + .rooms + .get_closest_parent(&pdu.prev_events, &their_current_state)? + { + Some(ClosestParent::Append) => { + let count = db.globals.next_count()?; + dbg!(&count); + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.sending, + )?; + } + Some(ClosestParent::Insert(old_count)) => { + let count = old_count; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + // Create a new count that is after old_count but before + // the pdu appended after + pdu_id.push(1); + + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.sending, + )?; + } + _ => panic!("Not a sequential event or no parents found"), + }; + resolved_map.insert(event_id, Ok::<(), String>(())); + continue; + } + + let our_current_state = db.rooms.room_state_full(room_id)?; + match state_res::StateResolution::resolve( + room_id, + &ruma::RoomVersionId::Version6, + &[ + our_current_state + .iter() + .map(|((ev, sk), v)| ((ev.clone(), sk.to_owned()), v.event_id.clone())) + .collect::>(), + // TODO we may not want the auth events chained in here for resolution? + their_current_state + .iter() + .map(|(_id, v)| ((v.kind(), v.state_key()), v.event_id().clone())) + .collect::>(), + ], + Some( + our_current_state + .iter() + .map(|(_k, v)| (v.event_id.clone(), v.convert_for_state_res())) + .chain( + their_current_state + .iter() + .map(|(id, ev)| (id.clone(), ev.clone())), + ) + .collect::>(), + ), + &db.rooms, + ) { + Ok(resolved) if resolved.values().any(|id| &event_id == id) => { + // If the event is older than the last event in pduid_pdu Tree then find the + // closest ancestor we know of and insert after the known ancestor by + // altering the known events pduid to = same roomID + same count bytes + 0x1 + // pushing a single byte every time a simple append cannot be done. + match db + .rooms + .get_closest_parent(&pdu.prev_events, &their_current_state)? + { + Some(ClosestParent::Append) => { + let count = db.globals.next_count()?; + dbg!(&count); + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.sending, + )?; + } + Some(ClosestParent::Insert(old_count)) => { + println!("INSERT PDU FOUND {}", old_count); + let count = old_count; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + // Create a new count that is after old_count but before + // the pdu appended after + pdu_id.push(1); + + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.into(), + &db.globals, + &db.account_data, + &db.sending, + )?; + } + _ => panic!("Not a sequential event or no parents found"), + } + + resolved_map.insert(event_id, Ok::<(), String>(())); + } + // If the eventId is not found in the resolved state auth has failed + Ok(_) => { + // TODO have state_res give the actual auth error in this case + resolved_map.insert( + event_id, + Err("This event failed authentication, not found in resolved set".into()), + ); + } + Err(e) => { + resolved_map.insert(event_id, Err(e.to_string())); + } + }; } - Ok(send_transaction_message::v1::Response { - pdus: BTreeMap::new(), - } - .into()) + + Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) } #[cfg_attr( @@ -529,3 +754,30 @@ pub fn get_user_devices_route<'a>( .into()) } */ + +/// Generates a correct eventId for the incoming pdu. +/// +/// Returns a `state_res::StateEvent` which can be converted freely and has accessor methods. +fn process_incoming_pdu(pdu: &ruma::Raw) -> (EventId, serde_json::Value) { + let mut value = serde_json::from_str(pdu.json().get()) + .expect("converting raw jsons to values always works"); + + let event_id = EventId::try_from(&*format!( + "${}", + ruma::signatures::reference_hash(&value, &RoomVersionId::Version6) + .expect("ruma can calculate reference hashes") + )) + .expect("ruma's reference hashes are valid event ids"); + + value.insert( + "event_id".to_owned(), + serde_json::json!(event_id) + .try_into() + .expect("EventId is a valid CanonicalJsonValue"), + ); + + ( + event_id, + serde_json::to_value(value).expect("JSON Value is a CanonicalJsonValue"), + ) +}