From 96dc6be14b8065f8833b8372cfe1e3655d358f77 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Fri, 15 Jan 2021 15:46:47 -0500 Subject: [PATCH] Use the auth_events for step 6, WIP forward_extremity_ids fn --- src/server_server.rs | 159 ++++++++++++++++++++++++++++--------------- 1 file changed, 104 insertions(+), 55 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index ae59583..77f0fa8 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -569,7 +569,7 @@ pub async fn send_transaction_message_route<'a>( // discard the event whereas the Client Server API's /send/{eventType} endpoint // would return a M_BAD_JSON error. let mut resolved_map = BTreeMap::new(); - for pdu in &body.pdus { + 'main_pdu_loop: for pdu in &body.pdus { // 1. Is a valid event, otherwise it is dropped. // Ruma/PduEvent/StateEvent satisfies this // We do not add the event_id field to the pdu here because of signature and hashes checks @@ -660,7 +660,6 @@ pub async fn send_transaction_message_route<'a>( }; let pdu = Arc::new(pdu.clone()); - // Fetch any unknown prev_events or retrieve them from the DB let previous = match fetch_events( &db, @@ -675,6 +674,7 @@ pub async fn send_transaction_message_route<'a>( _ => None, }; + // [auth_cache] At this point we have the auth chain of the incoming event. let mut event_map: state_res::EventMap> = auth_cache .iter() .map(|(k, v)| (k.clone(), v.clone())) @@ -688,7 +688,7 @@ pub async fn send_transaction_message_route<'a>( &pdu.auth_events .iter() .map(|id| { - event_map + auth_cache .get(id) .map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone())) .ok_or_else(|| { @@ -790,7 +790,15 @@ pub async fn send_transaction_message_route<'a>( // End of step 5. // Gather the forward extremities and resolve - let fork_states = match forward_extremity_ids(&db, &pdu) { + let fork_states = match forward_extremity_ids( + &db, + &pdu, + server_name, + &pub_key_map, + &mut auth_cache, + ) + .await + { Ok(states) => states, Err(_) => { resolved_map.insert(event_id, Err("Failed to gather forward extremities".into())); @@ -805,47 +813,44 @@ pub async fn send_transaction_message_route<'a>( } else if fork_states.len() == 1 { fork_states[0].clone() } else { + // TODO: remove this is for current debugging Jan, 15 2021 + let mut number_fetches = 0_u32; let mut auth_events = vec![]; // this keeps track if we error so we can break out of these inner loops // to continue on with the incoming PDU's - let mut failed = false; for map in &fork_states { let mut state_auth = vec![]; - for pdu in map.values() { - let event = match auth_cache.get(pdu.event_id()) { + for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { + let event = match auth_cache.get(auth_id) { Some(aev) => aev.clone(), // We should know about every event at this point but just incase... None => match fetch_events( &db, server_name, &pub_key_map, - &[pdu.event_id().clone()], + &[auth_id.clone()], &mut auth_cache, ) .await - .map(|mut vec| vec.remove(0)) - { - Ok(aev) => aev.clone(), + .map(|mut vec| { + number_fetches += 1; + vec.remove(0) + }) { + Ok(aev) => aev, Err(_) => { resolved_map.insert( event_id.clone(), Err("Event has been soft failed".into()), ); - failed = true; - break; + continue 'main_pdu_loop; } }, }; state_auth.push(event); } - if failed { - break; - } auth_events.push(state_auth); } - if failed { - continue; - } + info!("{} event's were not in the auth_cache", number_fetches); // Add everything we will need to event_map event_map.extend( @@ -886,7 +891,13 @@ pub async fn send_transaction_message_route<'a>( .into_iter() .map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap()))) .collect(), - Err(e) => panic!("{:?}", e), + Err(_) => { + resolved_map.insert( + pdu.event_id().clone(), + Err("State resolution failed, either an event could not be found or deserialization".into()), + ); + continue 'main_pdu_loop; + } } }; @@ -914,6 +925,7 @@ pub async fn send_transaction_message_route<'a>( Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into()) } +/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events /// Validate any event that is given to us by another server. /// /// 1. Is a valid event, otherwise it is dropped (PduEvent deserialization satisfies this). @@ -955,6 +967,37 @@ fn validate_event<'a>( }) } +/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events +/// The check in `fetch_check_auth_events` is that a complete chain is found for the +/// events `auth_events`. If the chain is found to have any missing events it fails. +/// +/// The `auth_cache` is filled instead of returning a `Vec`. +async fn fetch_check_auth_events( + db: &Database, + origin: &ServerName, + key_map: &PublicKeyMap, + event_ids: &[EventId], + auth_cache: &mut EventMap>, +) -> Result<()> { + let mut stack = event_ids.to_vec(); + + // DFS for auth event chain + while !stack.is_empty() { + let ev_id = stack.pop().unwrap(); + if auth_cache.contains_key(&ev_id) { + continue; + } + + let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache) + .await + .map(|mut vec| vec.remove(0))?; + + stack.extend(ev.auth_events()); + auth_cache.insert(ev.event_id().clone(), ev); + } + Ok(()) +} + /// Find the event and auth it. /// /// 1. Look in the main timeline (pduid_pdu tree) @@ -1000,36 +1043,6 @@ async fn fetch_events( Ok(pdus) } -/// The check in `fetch_check_auth_events` is that a complete chain is found for the -/// events `auth_events`. If the chain is found to have any missing events it fails. -/// -/// The `auth_cache` is filled instead of returning a `Vec`. -async fn fetch_check_auth_events( - db: &Database, - origin: &ServerName, - key_map: &PublicKeyMap, - event_ids: &[EventId], - auth_cache: &mut EventMap>, -) -> Result<()> { - let mut stack = event_ids.to_vec(); - - // DFS for auth event chain - while !stack.is_empty() { - let ev_id = stack.pop().unwrap(); - if auth_cache.contains_key(&ev_id) { - continue; - } - - let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache) - .await - .map(|mut vec| vec.remove(0))?; - - stack.extend(ev.auth_events()); - auth_cache.insert(ev.event_id().clone(), ev); - } - Ok(()) -} - /// Search the DB for the signing keys of the given server, if we don't have them /// fetch them from the server and save to our DB. async fn fetch_signing_keys( @@ -1049,6 +1062,7 @@ async fn fetch_signing_keys( } } } + fn signature_and_hash_check( pub_key_map: &ruma::signatures::PublicKeyMap, value: CanonicalJsonObject, @@ -1073,9 +1087,23 @@ fn signature_and_hash_check( ) } -fn forward_extremity_ids(db: &Database, pdu: &PduEvent) -> Result>>> { +async fn forward_extremity_ids( + db: &Database, + pdu: &PduEvent, + origin: &ServerName, + pub_key_map: &PublicKeyMap, + auth_cache: &mut EventMap>, +) -> Result>>> { + let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; + + for incoming_leaf in &pdu.prev_events { + if !current_leaves.contains(incoming_leaf) { + current_leaves.push(incoming_leaf.clone()); + } + } + let mut fork_states = vec![]; - for id in &db.rooms.get_pdu_leaves(pdu.room_id())? { + for id in ¤t_leaves { if let Some(id) = db.rooms.get_pdu_id(id)? { let state_hash = db .rooms @@ -1090,11 +1118,32 @@ fn forward_extremity_ids(db: &Database, pdu: &PduEvent) -> Result