WIP gather and update forward extremities
This commit is contained in:
		
							parent
							
								
									94da569119
								
							
						
					
					
						commit
						7309b2fba9
					
				
					 3 changed files with 132 additions and 45 deletions
				
			
		|  | @ -397,6 +397,24 @@ impl Rooms { | |||
|         Ok(events) | ||||
|     } | ||||
| 
 | ||||
|     /// Force an update to the leaves of a room.
 | ||||
|     pub fn force_pdu_leaves(&self, room_id: &RoomId, event_ids: &[EventId]) -> Result<()> { | ||||
|         let mut prefix = room_id.as_bytes().to_vec(); | ||||
|         prefix.push(0xff); | ||||
| 
 | ||||
|         for key in self.roomid_pduleaves.scan_prefix(&prefix).keys() { | ||||
|             self.roomid_pduleaves.remove(key?)?; | ||||
|         } | ||||
| 
 | ||||
|         for event_id in event_ids.iter() { | ||||
|             let mut key = prefix.to_owned(); | ||||
|             key.extend_from_slice(event_id.as_bytes()); | ||||
|             self.roomid_pduleaves.insert(&key, event_id.as_bytes())?; | ||||
|         } | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Replace the leaves of a room with a new event.
 | ||||
|     pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_id: &EventId) -> Result<()> { | ||||
|         let mut prefix = room_id.as_bytes().to_vec(); | ||||
|  |  | |||
							
								
								
									
										21
									
								
								src/pdu.rs
									
									
									
									
									
								
							
							
						
						
									
										21
									
								
								src/pdu.rs
									
									
									
									
									
								
							|  | @ -9,7 +9,7 @@ use ruma::{ | |||
| }; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use serde_json::json; | ||||
| use std::{collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH}; | ||||
| use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH}; | ||||
| 
 | ||||
| #[derive(Clone, Deserialize, Serialize, Debug)] | ||||
| pub struct PduEvent { | ||||
|  | @ -284,6 +284,25 @@ impl state_res::Event for PduEvent { | |||
|     } | ||||
| } | ||||
| 
 | ||||
| // These impl's allow us to dedup state snapshots when resolving state
 | ||||
| // for incoming events (federation/send/{txn}).
 | ||||
| impl Eq for PduEvent {} | ||||
| impl PartialEq for PduEvent { | ||||
|     fn eq(&self, other: &Self) -> bool { | ||||
|         self.event_id == other.event_id | ||||
|     } | ||||
| } | ||||
| impl PartialOrd for PduEvent { | ||||
|     fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||||
|         self.event_id.partial_cmp(&other.event_id) | ||||
|     } | ||||
| } | ||||
| impl Ord for PduEvent { | ||||
|     fn cmp(&self, other: &Self) -> Ordering { | ||||
|         self.event_id.cmp(&other.event_id) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Generates a correct eventId for the incoming pdu.
 | ||||
| ///
 | ||||
| /// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap<String, CanonicalJsonValue>`.
 | ||||
|  |  | |||
|  | @ -5,7 +5,6 @@ use log::{error, info, warn}; | |||
| use rocket::{get, post, put, response::content::Json, State}; | ||||
| use ruma::{ | ||||
|     api::{ | ||||
|         client::r0::state, | ||||
|         federation::{ | ||||
|             directory::{get_public_rooms, get_public_rooms_filtered}, | ||||
|             discovery::{ | ||||
|  | @ -25,7 +24,7 @@ use ruma::{ | |||
| }; | ||||
| use state_res::{Event, EventMap, StateMap}; | ||||
| use std::{ | ||||
|     collections::{BTreeMap, BTreeSet}, | ||||
|     collections::{BTreeMap, BTreeSet, HashSet}, | ||||
|     convert::TryFrom, | ||||
|     fmt::Debug, | ||||
|     future::Future, | ||||
|  | @ -600,31 +599,21 @@ pub async fn send_transaction_message_route<'a>( | |||
| 
 | ||||
|         let server_name = &body.body.origin; | ||||
|         let mut pub_key_map = BTreeMap::new(); | ||||
|         if let Some(sig) = value.get("signatures") { | ||||
|             match sig { | ||||
|                 CanonicalJsonValue::Object(entity) => { | ||||
|                     for key in entity.keys() { | ||||
|                         // TODO: save this in a DB maybe...
 | ||||
|                         // fetch the public signing key
 | ||||
|                         let origin = <&ServerName>::try_from(key.as_str()).unwrap(); | ||||
|                         let keys = fetch_signing_keys(&db, origin).await?; | ||||
| 
 | ||||
|                         pub_key_map.insert( | ||||
|                             origin.to_string(), | ||||
|                             keys.into_iter() | ||||
|                                 .map(|(k, v)| (k.to_string(), v.key)) | ||||
|                                 .collect(), | ||||
|                         ); | ||||
|                     } | ||||
|                 } | ||||
|                 _ => { | ||||
|                     resolved_map.insert( | ||||
|                         event_id, | ||||
|                         Err("`signatures` is not a JSON object".to_string()), | ||||
|                     ); | ||||
|                     continue; | ||||
|                 } | ||||
|             } | ||||
|         if let Some(CanonicalJsonValue::String(sender)) = value.get("sender") { | ||||
|             let sender = | ||||
|                 UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field"); | ||||
|             let origin = sender.server_name(); | ||||
| 
 | ||||
|             // TODO: this could fail or the server not respond...
 | ||||
|             let keys = fetch_signing_keys(&db, origin).await?; | ||||
| 
 | ||||
|             pub_key_map.insert( | ||||
|                 origin.to_string(), | ||||
|                 keys.into_iter() | ||||
|                     .map(|(k, v)| (k.to_string(), v.key)) | ||||
|                     .collect(), | ||||
|             ); | ||||
|         } else { | ||||
|             resolved_map.insert(event_id, Err("No field `signatures` in JSON".to_string())); | ||||
|             continue; | ||||
|  | @ -642,8 +631,9 @@ pub async fn send_transaction_message_route<'a>( | |||
|         // 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
 | ||||
|         // 5. reject "due to auth events" if the event doesn't pass auth based on the auth events
 | ||||
|         // 7. if not timeline event: stop
 | ||||
|         // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 | ||||
|         let (pdu, previous) = match validate_event( | ||||
|         // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 | ||||
|         // the events found in step 8 can be authed/resolved and appended to the DB
 | ||||
|         let (pdu, previous): (_, Vec<Arc<PduEvent>>) = match validate_event( | ||||
|             &db, | ||||
|             value, | ||||
|             event_id.clone(), | ||||
|  | @ -670,6 +660,9 @@ pub async fn send_transaction_message_route<'a>( | |||
|         // 6. persist the event as an outlier.
 | ||||
|         db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?; | ||||
| 
 | ||||
|         // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
 | ||||
|         // the checks in this list starting at 1. These are not timeline events.
 | ||||
|         //
 | ||||
|         // Step 10. check the auth of the event passes based on the calculated state of the event
 | ||||
|         let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) = | ||||
|             match db | ||||
|  | @ -771,8 +764,12 @@ pub async fn send_transaction_message_route<'a>( | |||
|             ); | ||||
|         }; | ||||
| 
 | ||||
|         // Gather the forward extremities and resolve
 | ||||
|         let fork_states = match forward_extremities( | ||||
|         // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
 | ||||
|         // where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
 | ||||
|         //
 | ||||
|         // calculate_forward_extremities takes care of adding the current state if not already in the state sets
 | ||||
|         // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
 | ||||
|         let (mut fork_states, fork_ids) = match calculate_forward_extremities( | ||||
|             &db, | ||||
|             &pdu, | ||||
|             server_name, | ||||
|  | @ -788,6 +785,12 @@ pub async fn send_transaction_message_route<'a>( | |||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         // add the incoming events to the mix of state snapshots
 | ||||
|         // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets
 | ||||
|         fork_states.insert(state_at_event.clone()); | ||||
| 
 | ||||
|         let fork_states = fork_states.into_iter().collect::<Vec<_>>(); | ||||
| 
 | ||||
|         // 13. start state-res with all previous forward extremities minus the ones that are in
 | ||||
|         // the prev_events of this event plus the new one created by this event and use
 | ||||
|         // the result as the new room state
 | ||||
|  | @ -901,7 +904,9 @@ pub async fn send_transaction_message_route<'a>( | |||
|                 Err("Event has been soft failed".into()), | ||||
|             ); | ||||
|         } else { | ||||
|             append_state(&db, &pdu)?; | ||||
|             // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
 | ||||
|             append_state(&db, &pdu, &fork_ids)?; | ||||
| 
 | ||||
|             // Event has passed all auth/stateres checks
 | ||||
|             resolved_map.insert(pdu.event_id().clone(), Ok(())); | ||||
|         } | ||||
|  | @ -1106,25 +1111,52 @@ async fn fetch_signing_keys( | |||
| /// Gather all state snapshots needed to resolve the current state of the room.
 | ||||
| ///
 | ||||
| /// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
 | ||||
| /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
 | ||||
| async fn forward_extremities( | ||||
| /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
 | ||||
| ///
 | ||||
| /// The state snapshot of the incoming event __needs__ to be added to the resulting list.
 | ||||
| async fn calculate_forward_extremities( | ||||
|     db: &Database, | ||||
|     pdu: &PduEvent, | ||||
|     origin: &ServerName, | ||||
|     pub_key_map: &PublicKeyMap, | ||||
|     auth_cache: &mut EventMap<Arc<PduEvent>>, | ||||
| ) -> Result<Vec<StateMap<Arc<PduEvent>>>> { | ||||
| ) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> { | ||||
|     let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; | ||||
| 
 | ||||
|     let mut is_incoming_leaf = true; | ||||
|     // Make sure the incoming event is not already a forward extremity
 | ||||
|     // FIXME: I think this could happen if different servers send us the same event??
 | ||||
|     if current_leaves.contains(pdu.event_id()) { | ||||
|         is_incoming_leaf = false; | ||||
|         // Not sure what to do here
 | ||||
|     } | ||||
| 
 | ||||
|     // If the incoming event is already referenced by an existing event
 | ||||
|     // then do nothing - it's not a candidate to be a new extremity if
 | ||||
|     // it has been referenced.
 | ||||
|     if already_referenced(db, pdu)? { | ||||
|         is_incoming_leaf = false; | ||||
|         // This event has been dealt with already??
 | ||||
|     } | ||||
| 
 | ||||
|     // TODO:
 | ||||
|     // [dendrite] Checks if any other leaves have been referenced and removes them
 | ||||
|     // but as long as we update the pdu leaves here and for events on our server this
 | ||||
|     // should not be possible.
 | ||||
| 
 | ||||
|     // Remove any forward extremities that are referenced by this incoming events prev_events
 | ||||
|     for incoming_leaf in &pdu.prev_events { | ||||
|         if !current_leaves.contains(incoming_leaf) { | ||||
|             current_leaves.push(incoming_leaf.clone()); | ||||
|         if current_leaves.contains(incoming_leaf) { | ||||
|             if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) { | ||||
|                 current_leaves.remove(pos); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let current_hash = db.rooms.current_state_hash(pdu.room_id())?; | ||||
| 
 | ||||
|     let mut includes_current_state = false; | ||||
|     let mut fork_states = vec![]; | ||||
|     let mut fork_states = BTreeSet::new(); | ||||
|     for id in ¤t_leaves { | ||||
|         if let Some(id) = db.rooms.get_pdu_id(id)? { | ||||
|             let state_hash = db | ||||
|  | @ -1142,8 +1174,10 @@ async fn forward_extremities( | |||
|                 .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) | ||||
|                 .collect(); | ||||
| 
 | ||||
|             fork_states.push(state); | ||||
|             fork_states.insert(state); | ||||
|         } else { | ||||
|             error!("Forward extremity not found... {}", id); | ||||
| 
 | ||||
|             let res = db | ||||
|                 .sending | ||||
|                 .send_federation_request( | ||||
|  | @ -1166,25 +1200,37 @@ async fn forward_extremities( | |||
|                 .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) | ||||
|                 .collect(); | ||||
| 
 | ||||
|             fork_states.push(state); | ||||
|             fork_states.insert(state); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // Add the incoming event only if it is a leaf, we do this after fetching all the
 | ||||
|     // state since we know we have already fetched the state of the incoming event so lets
 | ||||
|     // not do it again!
 | ||||
|     if is_incoming_leaf { | ||||
|         current_leaves.push(pdu.event_id().clone()); | ||||
|     } | ||||
| 
 | ||||
|     // This guarantees that our current room state is included
 | ||||
|     if !includes_current_state && current_hash.is_some() { | ||||
|         fork_states.push( | ||||
|         fork_states.insert( | ||||
|             db.rooms | ||||
|                 .state_full(pdu.room_id(), current_hash.as_ref().unwrap())? | ||||
|                 .into_iter() | ||||
|                 .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) | ||||
|                 .collect(), | ||||
|         ) | ||||
|         ); | ||||
|     } | ||||
| 
 | ||||
|     Ok(fork_states) | ||||
|     Ok((fork_states, dbg!(current_leaves))) | ||||
| } | ||||
| 
 | ||||
| fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { | ||||
| /// TODO: we need to know if the event is a prev_event (is this event already referenced in the DAG)
 | ||||
| fn already_referenced(_db: &Database, _pdu: &PduEvent) -> Result<bool> { | ||||
|     Ok(false) | ||||
| } | ||||
| 
 | ||||
| fn append_state(db: &Database, pdu: &PduEvent, new_room_leaves: &[EventId]) -> Result<()> { | ||||
|     let count = db.globals.next_count()?; | ||||
|     let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|     pdu_id.push(0xff); | ||||
|  | @ -1195,13 +1241,17 @@ fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { | |||
|     let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||
| 
 | ||||
|     db.rooms.append_pdu( | ||||
|         &pdu, | ||||
|         pdu, | ||||
|         utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), | ||||
|         count, | ||||
|         pdu_id.clone().into(), | ||||
|         &db, | ||||
|     )?; | ||||
| 
 | ||||
|     // If we update the room leaves after calling append_pdu it will stick since append_pdu
 | ||||
|     // calls replace_pdu_leaves with only the given event.
 | ||||
|     db.rooms.force_pdu_leaves(pdu.room_id(), new_room_leaves)?; | ||||
| 
 | ||||
|     // We set the room state after inserting the pdu, so that we never have a moment in time
 | ||||
|     // where events in the current room state do not exist
 | ||||
|     db.rooms.set_room_state(&pdu.room_id, &statehashid)?; | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue