From 1d465699296837fe147f855d95fcfef57775a3a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 14 Aug 2021 21:30:14 +0200 Subject: [PATCH] fix: don't use recursion for prev events --- src/server_server.rs | 228 ++++++++++++++++++++++++------------------- 1 file changed, 129 insertions(+), 99 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index cbbb850..5b2acd0 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -840,7 +840,7 @@ type AsyncRecursiveType<'a, T> = Pin + 'a + Send>>; /// 14. Use state resolution to find new room state // We use some AsyncRecursiveType hacks here so we can call this async funtion recursively #[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] -pub fn handle_incoming_pdu<'a>( +pub async fn handle_incoming_pdu<'a>( origin: &'a ServerName, event_id: &'a EventId, room_id: &'a RoomId, @@ -848,22 +848,76 @@ pub fn handle_incoming_pdu<'a>( is_timeline_event: bool, db: &'a Database, pub_key_map: &'a RwLock>>, -) -> AsyncRecursiveType<'a, StdResult>, String>> { +) -> StdResult>, String> { + match db.rooms.exists(&room_id) { + Ok(true) => {} + _ => { + return Err("Room is unknown to this server.".to_string()); + } + } + + // 1. Skip the PDU if we already have it as a timeline event + if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(&event_id) { + return Ok(Some(pdu_id.to_vec())); + } + + let create_event = db + .rooms + .room_state_get(&room_id, &EventType::RoomCreate, "") + .map_err(|_| "Failed to ask database for event.".to_owned())? + .ok_or_else(|| "Failed to find create event in db.".to_owned())?; + + let (incoming_pdu, val) = handle_outlier_pdu(origin, &create_event, event_id, room_id, value, db, pub_key_map).await?; + + // 8. if not timeline event: stop + if !is_timeline_event + || incoming_pdu.origin_server_ts + < db.rooms + .first_pdu_in_room(&room_id) + .map_err(|_| "Error loading first room event.".to_owned())? + .expect("Room exists") + .origin_server_ts + { + return Ok(None); + } + + // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events + let mut todo_outlier_stack = incoming_pdu.prev_events.clone(); + let mut todo_timeline_stack = Vec::new(); + while let Some(prev_event_id) = todo_outlier_stack.pop() { + if let Some((pdu, Some(json))) = fetch_and_handle_outliers( + db, + origin, + &[prev_event_id], + &create_event, + &room_id, + pub_key_map, + ) + .await.pop() { + todo_timeline_stack.push((pdu, json)); + } + } + + while let Some(prev) = todo_timeline_stack.pop() { + upgrade_outlier_to_timeline_pdu(prev.0, prev.1, &create_event, origin, db, room_id, pub_key_map).await?; + } + + upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, db, room_id, pub_key_map).await +} + +fn handle_outlier_pdu<'a>( + origin: &'a ServerName, + create_event: &'a PduEvent, + event_id: &'a EventId, + room_id: &'a RoomId, + value: BTreeMap, + db: &'a Database, + pub_key_map: &'a RwLock>>, +) -> AsyncRecursiveType<'a, StdResult<(Arc, BTreeMap), String>> { Box::pin(async move { let start_time = Instant::now(); // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json - match db.rooms.exists(&room_id) { - Ok(true) => {} - _ => { - return Err("Room is unknown to this server.".to_string()); - } - } - - // 1. Skip the PDU if we already have it as a timeline event - if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(&event_id) { - return Ok(Some(pdu_id.to_vec())); - } // We go through all the signatures we see on the value and fetch the corresponding signing // keys @@ -873,17 +927,12 @@ pub fn handle_incoming_pdu<'a>( // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match - let create_event = db - .rooms - .room_state_get(&room_id, &EventType::RoomCreate, "") - .map_err(|_| "Failed to ask database for event.".to_owned())? - .ok_or_else(|| "Failed to find create event in db.".to_owned())?; - let create_event_content = - serde_json::from_value::>(create_event.content.clone()) - .expect("Raw::from_value always works.") - .deserialize() - .map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; + let create_event_content = + serde_json::from_value::>(create_event.content.clone()) + .expect("Raw::from_value always works.") + .deserialize() + .map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; let room_version_id = &create_event_content.room_version; let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); @@ -924,13 +973,13 @@ pub fn handle_incoming_pdu<'a>( // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // EDIT: Step 5 is not applied anymore because it failed too often debug!("Fetching auth events for {}", incoming_pdu.event_id); - fetch_and_handle_events( + fetch_and_handle_outliers( db, origin, &incoming_pdu.auth_events, + &create_event, &room_id, pub_key_map, - false, ) .await; @@ -1013,37 +1062,20 @@ pub fn handle_incoming_pdu<'a>( .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; debug!("Added pdu as outlier."); - // 8. if not timeline event: stop - if !is_timeline_event - || incoming_pdu.origin_server_ts - < db.rooms - .first_pdu_in_room(&room_id) - .map_err(|_| "Error loading first room event.".to_owned())? - .expect("Room exists") - .origin_server_ts - { - let elapsed = start_time.elapsed(); - warn!( - "Handling outlier event {} took {}m{}s", - event_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); - return Ok(None); - } + Ok((incoming_pdu,val)) + }) - // TODO: make not recursive - // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events - fetch_and_handle_events( - db, - origin, - &incoming_pdu.prev_events, - &room_id, - pub_key_map, - true, - ) - .await; +} +async fn upgrade_outlier_to_timeline_pdu( + incoming_pdu: Arc, + val: BTreeMap, + create_event: &PduEvent, + origin: &ServerName, + db: &Database, + room_id: &RoomId, + pub_key_map: &RwLock>>, +) -> StdResult>, String> { // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities // doing all the checks in this list starting at 1. These are not timeline events. @@ -1065,17 +1097,17 @@ pub fn handle_incoming_pdu<'a>( if let Some(Ok(state)) = state { warn!("Using cached state"); - let mut state = fetch_and_handle_events( + let mut state = fetch_and_handle_outliers( db, origin, &state.into_iter().collect::>(), + &create_event, &room_id, pub_key_map, - false, ) .await .into_iter() - .map(|pdu| { + .map(|(pdu,_)| { ( ( pdu.kind.clone(), @@ -1119,18 +1151,18 @@ pub fn handle_incoming_pdu<'a>( { Ok(res) => { debug!("Fetching state events at event."); - let state_vec = fetch_and_handle_events( + let state_vec = fetch_and_handle_outliers( &db, origin, &res.pdu_ids, + &create_event, &room_id, pub_key_map, - false, ) .await; let mut state = HashMap::new(); - for pdu in state_vec { + for (pdu, _) in state_vec { match state.entry((pdu.kind.clone(), pdu.state_key.clone().ok_or_else(|| "Found non-state pdu in state events.".to_owned())?)) { Entry::Vacant(v) => { v.insert(pdu); @@ -1153,13 +1185,13 @@ pub fn handle_incoming_pdu<'a>( } debug!("Fetching auth chain events at event."); - fetch_and_handle_events( + fetch_and_handle_outliers( &db, origin, &res.auth_chain_ids, + &create_event, &room_id, pub_key_map, - false, ) .await; @@ -1175,6 +1207,28 @@ pub fn handle_incoming_pdu<'a>( state_at_incoming_event.expect("we always set this to some above"); // 11. Check the auth of the event passes based on the state of the event + let create_event_content = + serde_json::from_value::>(create_event.content.clone()) + .expect("Raw::from_value always works.") + .deserialize() + .map_err(|_| "Invalid PowerLevels event in db.".to_owned())?; + + let room_version_id = &create_event_content.room_version; + let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); + + // If the previous event was the create event special rules apply + let previous_create = if incoming_pdu.auth_events.len() == 1 + && incoming_pdu.prev_events == incoming_pdu.auth_events + { + db.rooms + .get_pdu(&incoming_pdu.auth_events[0]) + .map_err(|e| e.to_string())? + .filter(|maybe_create| **maybe_create == *create_event) + } else { + None + }; + + if !state_res::event_auth::auth_check( &room_version, &incoming_pdu, @@ -1396,34 +1450,27 @@ pub fn handle_incoming_pdu<'a>( // Event has passed all auth/stateres checks drop(state_lock); - - let elapsed = start_time.elapsed(); - warn!( - "Handling timeline event {} took {}m{}s", - event_id, - elapsed.as_secs() / 60, - elapsed.as_secs() % 60 - ); Ok(pdu_id) - }) } /// Find the event and auth it. Once the event is validated (steps 1 - 8) /// it is appended to the outliers Tree. /// +/// Returns pdu and if we fetched it over federation the raw json. +/// /// a. Look in the main timeline (pduid_pdu tree) /// b. Look at outlier pdu tree /// c. Ask origin server over federation /// d. TODO: Ask other servers over federation? //#[tracing::instrument(skip(db, key_map, auth_cache))] -pub(crate) fn fetch_and_handle_events<'a>( +pub(crate) fn fetch_and_handle_outliers<'a>( db: &'a Database, origin: &'a ServerName, events: &'a [EventId], + create_event: &'a PduEvent, room_id: &'a RoomId, pub_key_map: &'a RwLock>>, - are_timeline_events: bool, -) -> AsyncRecursiveType<'a, Vec>> { +) -> AsyncRecursiveType<'a, Vec<(Arc, Option>)>> { Box::pin(async move { let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { Entry::Vacant(e) => { @@ -1449,16 +1496,12 @@ pub(crate) fn fetch_and_handle_events<'a>( // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree - // (get_pdu checks both) - let local_pdu = if are_timeline_events { - db.rooms.get_non_outlier_pdu(&id).map(|o| o.map(Arc::new)) - } else { - db.rooms.get_pdu(&id) - }; + // (get_pdu_json checks both) + let local_pdu = db.rooms.get_pdu(&id); let pdu = match local_pdu { Ok(Some(pdu)) => { trace!("Found {} in db", id); - pdu + (pdu, None) } Ok(None) => { // c. Ask origin server over federation @@ -1474,39 +1517,26 @@ pub(crate) fn fetch_and_handle_events<'a>( { Ok(res) => { debug!("Got {} over federation", id); - let (event_id, mut value) = + let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&res.pdu) { Ok(t) => t, Err(_) => continue, }; // This will also fetch the auth chain - match handle_incoming_pdu( + match handle_outlier_pdu( origin, + create_event, &event_id, &room_id, value.clone(), - are_timeline_events, db, pub_key_map, ) .await { - Ok(_) => { - value.insert( - "event_id".to_owned(), - CanonicalJsonValue::String(event_id.into()), - ); - - Arc::new( - serde_json::from_value( - serde_json::to_value(value) - .expect("canonicaljsonobject is valid value"), - ) - .expect( - "This is possible because handle_incoming_pdu worked", - ), - ) + Ok((pdu, json)) => { + (pdu, Some(json)) } Err(e) => { warn!("Authentication of event {} failed: {:?}", id, e);