From 74d530ae0eff76bbdd7a130cd17a645b5455676f Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Wed, 3 Feb 2021 20:00:01 -0500 Subject: [PATCH] Address review issues, fix forward extremity calc Keep track of all prev_events since if we know that an event is a prev_event it is referenced and does not qualify as a forward extremity. --- src/client_server/push.rs | 5 +- src/database.rs | 3 +- src/database/globals.rs | 8 +- src/database/rooms.rs | 79 +++++---- src/server_server.rs | 341 ++++++++++++++++++-------------------- 5 files changed, 220 insertions(+), 216 deletions(-) diff --git a/src/client_server/push.rs b/src/client_server/push.rs index 667d667..7c3e9d9 100644 --- a/src/client_server/push.rs +++ b/src/client_server/push.rs @@ -93,7 +93,10 @@ pub async fn get_pushrule_route( if let Some(rule) = rule { Ok(get_pushrule::Response { rule }.into()) } else { - Err(Error::BadRequest(ErrorKind::NotFound, "Push rule not found.").into()) + Err(Error::BadRequest( + ErrorKind::NotFound, + "Push rule not found.", + )) } } diff --git a/src/database.rs b/src/database.rs index b841ab9..3fb8442 100644 --- a/src/database.rs +++ b/src/database.rs @@ -159,7 +159,8 @@ impl Database { stateid_pduid: db.open_tree("stateid_pduid")?, pduid_statehash: db.open_tree("pduid_statehash")?, roomid_statehash: db.open_tree("roomid_statehash")?, - eventid_outlierpdu: db.open_tree("eventid_outlierpdu")?, + roomeventid_outlierpdu: db.open_tree("roomeventid_outlierpdu")?, + prevevent_parent: db.open_tree("prevevent_parent")?, }, account_data: account_data::AccountData { roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/globals.rs b/src/database/globals.rs index 2ed6a9f..00b4568 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -27,7 +27,11 @@ pub struct Globals { } impl Globals { - pub fn load(globals: sled::Tree, server_keys: sled::Tree, config: Config) -> Result { + pub fn load( + globals: sled::Tree, + servertimeout_signingkey: sled::Tree, + config: Config, + ) -> Result { let bytes = &*globals .update_and_fetch("keypair", utils::generate_keypair)? .expect("utils::generate_keypair always returns Some"); @@ -84,7 +88,7 @@ impl Globals { })?, actual_destination_cache: Arc::new(RwLock::new(HashMap::new())), jwt_decoding_key, - servertimeout_signingkey: server_keys, + servertimeout_signingkey, }) } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 6ee29a6..abe8c65 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -68,7 +68,9 @@ pub struct Rooms { /// RoomId + EventId -> outlier PDU. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. - pub(super) eventid_outlierpdu: sled::Tree, + pub(super) roomeventid_outlierpdu: sled::Tree, + /// RoomId + EventId -> Parent PDU EventId. + pub(super) prevevent_parent: sled::Tree, } impl Rooms { @@ -92,7 +94,7 @@ impl Rooms { Some(b) => serde_json::from_slice::(&b) .map_err(|_| Error::bad_database("Invalid PDU in db.")), None => self - .eventid_outlierpdu + .roomeventid_outlierpdu .get(pduid)? .map(|b| { serde_json::from_slice::(&b) @@ -120,8 +122,6 @@ impl Rooms { } /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`). - /// - /// TODO: Should this check for outliers, it does now. pub fn state_get( &self, room_id: &RoomId, @@ -153,7 +153,7 @@ impl Rooms { Some(b) => serde_json::from_slice::(&b) .map_err(|_| Error::bad_database("Invalid PDU in db."))?, None => self - .eventid_outlierpdu + .roomeventid_outlierpdu .get(pdu_id)? .map(|b| { serde_json::from_slice::(&b) @@ -203,7 +203,7 @@ impl Rooms { &event_type, &state_key .as_deref() - .expect("found a non state event in auth events"), + .ok_or_else(|| Error::bad_database("Saved auth event with no state key."))?, )? { events.insert((event_type, state_key), pdu); } @@ -248,7 +248,7 @@ impl Rooms { let mut prefix = state_hash.to_vec(); prefix.push(0xff); - for ((event_type, state_key), pdu_id) in state { + for ((event_type, state_key), id_long) in state { let mut statekey = event_type.as_ref().as_bytes().to_vec(); statekey.push(0xff); statekey.extend_from_slice(&state_key.as_bytes()); @@ -266,7 +266,7 @@ impl Rooms { // Because of outliers this could also be an eventID but that // is handled by `state_full` - let pdu_id_short = pdu_id + let pdu_id_short = id_long .splitn(2, |&b| b == 0xff) .nth(1) .ok_or_else(|| Error::bad_database("Invalid pduid in state."))?; @@ -332,7 +332,7 @@ impl Rooms { serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { Some(b) => b, None => self - .eventid_outlierpdu + .roomeventid_outlierpdu .get(event_id.as_bytes())? .ok_or_else(|| { Error::bad_database("Event is not in pdu tree or outliers.") @@ -360,12 +360,10 @@ impl Rooms { Ok(Some( serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { Some(b) => b, - None => self - .eventid_outlierpdu - .get(event_id.as_bytes())? - .ok_or_else(|| { - Error::bad_database("Event is not in pdu tree or outliers.") - })?, + None => match self.roomeventid_outlierpdu.get(event_id.as_bytes())? { + Some(b) => b, + None => return Ok(None), + }, }) .map_err(|_| Error::bad_database("Invalid PDU in db."))?, )) @@ -373,6 +371,8 @@ impl Rooms { } /// Returns the pdu. + /// + /// This does __NOT__ check the outliers `Tree`. pub fn get_pdu_from_id(&self, pdu_id: &IVec) -> Result> { self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { Ok(Some( @@ -436,7 +436,7 @@ impl Rooms { /// Replace the leaves of a room. /// - /// The provided `event_ids` become the new leaves, this enables an event having multiple + /// The provided `event_ids` become the new leaves, this allows a room to have multiple /// `prev_events`. pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_ids: &[EventId]) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); @@ -455,31 +455,42 @@ impl Rooms { Ok(()) } + pub fn is_pdu_referenced(&self, pdu: &PduEvent) -> Result { + let mut key = pdu.room_id().as_bytes().to_vec(); + key.extend_from_slice(pdu.event_id().as_bytes()); + self.prevevent_parent.contains_key(key).map_err(Into::into) + } + /// Returns the pdu from the outlier tree. pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result> { - self.eventid_outlierpdu + self.roomeventid_outlierpdu .get(event_id.as_bytes())? .map_or(Ok(None), |pdu| { serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) }) } - /// Returns true if the event_id was previously inserted. - pub fn append_pdu_outlier(&self, pdu: &PduEvent) -> Result { - log::info!("Number of outlier pdu's {}", self.eventid_outlierpdu.len()); + /// Append the PDU as an outlier. + /// + /// Any event given to this will be processed (state-res) on another thread. + pub fn append_pdu_outlier(&self, pdu: &PduEvent) -> Result<()> { + log::info!( + "Number of outlier pdu's {}", + self.roomeventid_outlierpdu.len() + ); let mut key = pdu.room_id().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(pdu.event_id().as_bytes()); - let res = self - .eventid_outlierpdu - .insert( - &key, - &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), - ) - .map(|op| op.is_some())?; - Ok(res) + self.eventid_pduid + .insert(pdu.event_id().as_bytes(), key.as_slice())?; + + self.roomeventid_outlierpdu.insert( + &key, + &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), + )?; + Ok(()) } /// Creates a new persisted data unit and adds it to a room. @@ -526,7 +537,15 @@ impl Rooms { let mut key = pdu.room_id().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(pdu.event_id().as_bytes()); - self.eventid_outlierpdu.remove(key)?; + self.roomeventid_outlierpdu.remove(key)?; + + // We must keep track of all events that have been referenced. + for leaf in leaves { + let mut key = pdu.room_id().as_bytes().to_vec(); + key.extend_from_slice(leaf.as_bytes()); + self.prevevent_parent + .insert(key, pdu.event_id().as_bytes())?; + } self.replace_pdu_leaves(&pdu.room_id, leaves)?; @@ -541,6 +560,8 @@ impl Rooms { .expect("CanonicalJsonObject is always a valid String"), )?; + // This also replaces the eventid of any outliers with the correct + // pduid, removing the place holder. self.eventid_pduid .insert(pdu.event_id.as_bytes(), &*pdu_id)?; diff --git a/src/server_server.rs b/src/server_server.rs index 2cfbc6e..48d5956 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -571,8 +571,6 @@ pub async fn send_transaction_message_route<'a>( } // If we know of this pdu we don't need to continue processing it - // - // This check is essentially if let Ok(Some(_)) = db.rooms.get_pdu_id(&event_id) { return None; } @@ -664,64 +662,66 @@ pub async fn send_transaction_message_route<'a>( // the checks in this list starting at 1. These are not timeline events. // // Step 10. check the auth of the event passes based on the calculated state of the event - let (mut state_at_event, incoming_auth_events): ( - StateMap>, - Vec>, - ) = match db - .sending - .send_federation_request( - &db.globals, - server_name, - get_room_state_ids::v1::Request { - room_id: pdu.room_id(), - event_id: pdu.event_id(), - }, - ) - .await - { - Ok(res) => { - let state = fetch_events( - &db, + // + // TODO: if we know the prev_events of the incoming event we can avoid the request and build + // the state from a known point and resolve if > 1 prev_event + let (state_at_event, incoming_auth_events): (StateMap>, Vec>) = + match db + .sending + .send_federation_request( + &db.globals, server_name, - &pub_key_map, - &res.pdu_ids, - &mut auth_cache, + get_room_state_ids::v1::Request { + room_id: pdu.room_id(), + event_id: pdu.event_id(), + }, ) - .await?; - // Sanity check: there are no conflicting events in the state we received - let mut seen = BTreeSet::new(); - for ev in &state { - // If the key is already present - if !seen.insert((&ev.kind, &ev.state_key)) { - todo!("Server sent us an invalid state") - } - } - - let state = state - .into_iter() - .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) - .collect(); - - ( - state, - fetch_events( + .await + { + Ok(res) => { + let state = fetch_events( &db, server_name, &pub_key_map, - &res.auth_chain_ids, + &res.pdu_ids, &mut auth_cache, ) - .await?, - ) - } - Err(_) => { - resolved_map.insert( - pdu.event_id().clone(), - Err("Fetching state for event failed".into()), - ); - continue; - } - }; + .await?; + // Sanity check: there are no conflicting events in the state we received + let mut seen = BTreeSet::new(); + for ev in &state { + // If the key is already present + if !seen.insert((&ev.kind, &ev.state_key)) { + error!("Server sent us an invalid state"); + continue; + } + } + + let state = state + .into_iter() + .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) + .collect(); + + ( + state, + fetch_events( + &db, + server_name, + &pub_key_map, + &res.auth_chain_ids, + &mut auth_cache, + ) + .await?, + ) + } + Err(_) => { + resolved_map.insert( + pdu.event_id().clone(), + Err("Fetching state for event failed".into()), + ); + continue; + } + }; // 10. This is the actual auth check for state at the event if !state_res::event_auth::auth_check( @@ -764,6 +764,7 @@ pub async fn send_transaction_message_route<'a>( pdu.event_id().clone(), Err("Event has been soft failed".into()), ); + continue; }; // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res @@ -779,10 +780,6 @@ pub async fn send_transaction_message_route<'a>( } }; - // Now that the event has passed all auth it is added into the timeline, we do have to - // find the leaves otherwise we would do this sooner - append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; - // This will create the state after any state snapshot it builds // So current_state will have the incoming event inserted to it let mut fork_states = match build_forward_extremity_snapshots( @@ -805,10 +802,11 @@ pub async fn send_transaction_message_route<'a>( // Make this the state after (since we appended_incoming_pdu this should agree with our servers // current state). - state_at_event.insert((pdu.kind(), pdu.state_key()), pdu.clone()); - // add the incoming events to the mix of state snapshots + let mut state_after = state_at_event.clone(); + state_after.insert((pdu.kind(), pdu.state_key()), pdu.clone()); + // Add the incoming event to the mix of state snapshots // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets - fork_states.insert(state_at_event.clone()); + fork_states.insert(state_after.clone()); let fork_states = fork_states.into_iter().collect::>(); @@ -826,39 +824,27 @@ pub async fn send_transaction_message_route<'a>( update_state = true; // TODO: remove this is for current debugging Jan, 15 2021 - let mut number_fetches = 0_u32; let mut auth_events = vec![]; for map in &fork_states { let mut state_auth = vec![]; for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { let event = match auth_cache.get(auth_id) { Some(aev) => aev.clone(), - // We should know about every event at this point but just incase... - None => match fetch_events( - &db, - server_name, - &pub_key_map, - &[auth_id.clone()], - &mut auth_cache, - ) - .await - .map(|mut vec| { - number_fetches += 1; - vec.pop() - }) { - Ok(Some(aev)) => aev, - _ => { - resolved_map - .insert(event_id.clone(), Err("Failed to fetch event".into())); - continue 'main_pdu_loop; - } - }, + // The only events that haven't been added to the auth cache are + // events we have knowledge of previously + None => { + error!("Event was not present in auth_cache {}", auth_id); + resolved_map.insert( + event_id.clone(), + Err("Event was not present in auth cache".into()), + ); + continue 'main_pdu_loop; + } }; state_auth.push(event); } auth_events.push(state_auth); } - info!("{} event's were not in the auth_cache", number_fetches); // Add everything we will need to event_map auth_cache.extend( @@ -873,7 +859,7 @@ pub async fn send_transaction_message_route<'a>( .map(|pdu| (pdu.event_id().clone(), pdu)), ); auth_cache.extend( - state_at_event + state_after .into_iter() .map(|(_, pdu)| (pdu.event_id().clone(), pdu)), ); @@ -911,17 +897,12 @@ pub async fn send_transaction_message_route<'a>( let pdu = match auth_cache.get(&id) { Some(pdu) => pdu.clone(), None => { - match fetch_events(&db, server_name, &pub_key_map, &[id], &mut auth_cache) - .await - .map(|mut vec| vec.pop()) - { - Ok(Some(aev)) => aev, - _ => { - resolved_map - .insert(event_id.clone(), Err("Failed to fetch event".into())); - continue 'main_pdu_loop; - } - } + error!("Event was not present in auth_cache {}", id); + resolved_map.insert( + event_id.clone(), + Err("Event was not present in auth cache".into()), + ); + continue 'main_pdu_loop; } }; resolved.insert(k, pdu); @@ -929,7 +910,12 @@ pub async fn send_transaction_message_route<'a>( resolved }; - // Add the event to the DB and update the forward extremities (via roomid_pduleaves). + // Now that the event has passed all auth it is added into the timeline. + // We use the `state_at_event` instead of `state_after` so we accurately + // represent the state for this event. + append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; + + // Set the new room state to the resolved state update_resolved_state( &db, pdu.room_id(), @@ -1046,8 +1032,6 @@ fn validate_event<'a>( /// TODO: don't add as outlier if event is fetched as a result of gathering auth_events /// The check in `fetch_check_auth_events` is that a complete chain is found for the /// events `auth_events`. If the chain is found to have any missing events it fails. -/// -/// The `auth_cache` is filled instead of returning a `Vec`. async fn fetch_check_auth_events( db: &Database, origin: &ServerName, @@ -1073,7 +1057,6 @@ async fn fetch_check_auth_events( })??; stack.extend(ev.auth_events()); - auth_cache.insert(ev.event_id().clone(), ev); } Ok(()) } @@ -1085,6 +1068,9 @@ async fn fetch_check_auth_events( /// 2. Look at outlier pdu tree /// 3. Ask origin server over federation /// 4. TODO: Ask other servers over federation? +/// +/// If the event is unknown to the `auth_cache` it is added. This guarantees that any +/// event we need to know of will be present. pub(crate) async fn fetch_events( db: &Database, origin: &ServerName, @@ -1118,6 +1104,7 @@ pub(crate) async fn fetch_events( Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), }, }; + auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone()); pdus.push(pdu); } Ok(pdus) @@ -1167,13 +1154,9 @@ pub(crate) async fn calculate_forward_extremities( // If the incoming event is already referenced by an existing event // then do nothing - it's not a candidate to be a new extremity if // it has been referenced. - // - // We check this in the filter just before the main incoming PDU for loop - // so no already known event can make it this far. - // - // if db.rooms.get_pdu_id(pdu.event_id())?.is_some() { - // is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some(); - // } + if db.rooms.is_pdu_referenced(pdu)? { + is_incoming_leaf = false; + } // TODO: // [dendrite] Checks if any other leaves have been referenced and removes them @@ -1217,74 +1200,79 @@ pub(crate) async fn build_forward_extremity_snapshots( let mut includes_current_state = false; let mut fork_states = BTreeSet::new(); for id in current_leaves { - if let Some(id) = db.rooms.get_pdu_id(id)? { - let state_hash = db - .rooms - .pdu_state_hash(&id)? - .expect("found pdu with no statehash"); + match db.rooms.get_pdu_id(id)? { + // We can skip this because it is handled outside of this function + // The current server state and incoming event state are built to be + // the state after. + // This would be the incoming state from the server. + Some(_) if id == pdu.event_id() => {} + Some(pduid) if db.rooms.get_pdu_from_id(&pduid)?.is_some() => { + let state_hash = db + .rooms + .pdu_state_hash(&pduid)? + .expect("found pdu with no statehash"); - if current_hash.as_ref() == Some(&state_hash) { - includes_current_state = true; + if current_hash.as_ref() == Some(&state_hash) { + includes_current_state = true; + } + + let mut state_before = db + .rooms + .state_full(pdu.room_id(), &state_hash)? + .into_iter() + .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) + .collect::>(); + + // Now it's the state after + if let Some(pdu) = db.rooms.get_pdu_from_id(&pduid)? { + let key = (pdu.kind.clone(), pdu.state_key()); + state_before.insert(key, Arc::new(pdu)); + } + + fork_states.insert(state_before); } + _ => { + error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind()); - let mut state_before = db - .rooms - .state_full(pdu.room_id(), &state_hash)? - .into_iter() - .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) - .collect::>(); + let res = db + .sending + .send_federation_request( + &db.globals, + origin, + get_room_state_ids::v1::Request { + room_id: pdu.room_id(), + event_id: id, + }, + ) + .await?; - // Now it's the state after - if let Some(pdu) = db.rooms.get_pdu_from_id(&id)? { - let key = (pdu.kind.clone(), pdu.state_key()); - state_before.insert(key, Arc::new(pdu)); + // TODO: This only adds events to the auth_cache, there is for sure a better way to + // do this... + fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?; + + let mut state_before = + fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache) + .await? + .into_iter() + .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) + .collect::>(); + + if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache) + .await? + .pop() + { + let key = (pdu.kind.clone(), pdu.state_key()); + state_before.insert(key, pdu); + } + + // Now it's the state after + fork_states.insert(state_before); } - - fork_states.insert(state_before); - } else if id == pdu.event_id() { - // We add this snapshot after `build_forward_extremity_snapshots` is - // called which we requested from the sending server - } else { - error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind()); - - let res = db - .sending - .send_federation_request( - &db.globals, - origin, - get_room_state_ids::v1::Request { - room_id: pdu.room_id(), - event_id: id, - }, - ) - .await?; - - // TODO: This only adds events to the auth_cache, there is for sure a better way to - // do this... - fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?; - - let mut state_before = fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache) - .await? - .into_iter() - .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) - .collect::>(); - - if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache) - .await? - .pop() - { - let key = (pdu.kind.clone(), pdu.state_key()); - state_before.insert(key, pdu); - } - - // Now it's the state after - fork_states.insert(state_before); } } // This guarantees that our current room state is included - if !includes_current_state && current_hash.is_some() { - error!("Did not include current state"); + if !includes_current_state { current_state.insert((pdu.kind(), pdu.state_key()), pdu); fork_states.insert(current_state); @@ -1316,18 +1304,7 @@ pub(crate) fn update_resolved_state( ); } None => { - let mut pduid = pdu.room_id().as_bytes().to_vec(); - pduid.push(0xff); - pduid.extend_from_slice(pdu.event_id().as_bytes()); - new_state.insert( - ( - ev_type, - state_k.ok_or_else(|| { - Error::Conflict("State contained non state event") - })?, - ), - pduid, - ); + error!("We are missing a state event for the current room state."); } } } @@ -1349,9 +1326,9 @@ pub(crate) fn append_incoming_pdu( // Update the state of the room if needed // We can tell if we need to do this based on wether state resolution took place or not let mut new_state = HashMap::new(); - for ((ev_type, state_k), pdu) in state { - match db.rooms.get_pdu_id(pdu.event_id())? { - Some(pduid) => { + for ((ev_type, state_k), state_pdu) in state { + match db.rooms.get_pdu_id(state_pdu.event_id())? { + Some(state_pduid) => { new_state.insert( ( ev_type.clone(), @@ -1359,12 +1336,10 @@ pub(crate) fn append_incoming_pdu( .clone() .ok_or_else(|| Error::Conflict("State contained non state event"))?, ), - pduid.to_vec(), + state_pduid.to_vec(), ); } - None => { - error!("We didn't append an event as an outlier\n{:?}", pdu); - } + None => error!("We are missing a state event for the incoming event snapshot"), } }