Fix and integrate outlier tree, build forks after adding event to DB
This commit is contained in:
		
							parent
							
								
									6fd3e1d1dd
								
							
						
					
					
						commit
						6661de50ab
					
				
					 3 changed files with 263 additions and 168 deletions
				
			
		|  | @ -159,7 +159,7 @@ impl Database { | |||
|                 stateid_pduid: db.open_tree("stateid_pduid")?, | ||||
|                 pduid_statehash: db.open_tree("pduid_statehash")?, | ||||
|                 roomid_statehash: db.open_tree("roomid_statehash")?, | ||||
|                 eventid_outlierpdu: db.open_tree("eventid_outlierpdu")?, | ||||
|                 pduid_outlierpdu: db.open_tree("pduid_outlierpdu")?, | ||||
|             }, | ||||
|             account_data: account_data::AccountData { | ||||
|                 roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, | ||||
|  |  | |||
|  | @ -27,9 +27,10 @@ use std::{ | |||
|     convert::{TryFrom, TryInto}, | ||||
|     mem, | ||||
|     sync::Arc, | ||||
|     time::Duration, | ||||
| }; | ||||
| 
 | ||||
| use super::admin::AdminCommand; | ||||
| use super::{admin::AdminCommand, sending::Sending}; | ||||
| 
 | ||||
| /// The unique identifier of each state group.
 | ||||
| ///
 | ||||
|  | @ -67,7 +68,7 @@ pub struct Rooms { | |||
|     pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid)
 | ||||
| 
 | ||||
|     /// Any pdu that has passed the steps up to auth with auth_events.
 | ||||
|     pub(super) eventid_outlierpdu: sled::Tree, | ||||
|     pub(super) pduid_outlierpdu: sled::Tree, | ||||
| } | ||||
| 
 | ||||
| impl Rooms { | ||||
|  | @ -85,13 +86,20 @@ impl Rooms { | |||
|                 let mut pduid = room_id.as_bytes().to_vec(); | ||||
|                 pduid.push(0xff); | ||||
|                 pduid.extend_from_slice(&pduid_short?); | ||||
|                 self.pduid_pdu.get(&pduid)?.map_or_else( | ||||
|                     || Err(Error::bad_database("Failed to find PDU in state snapshot.")), | ||||
|                     |b| { | ||||
|                         serde_json::from_slice::<PduEvent>(&b) | ||||
|                             .map_err(|_| Error::bad_database("Invalid PDU in db.")) | ||||
|                     }, | ||||
|                 ) | ||||
|                 match self.pduid_pdu.get(&pduid)? { | ||||
|                     Some(b) => serde_json::from_slice::<PduEvent>(&b) | ||||
|                         .map_err(|_| Error::bad_database("Invalid PDU in db.")), | ||||
|                     None => self | ||||
|                         .pduid_outlierpdu | ||||
|                         .get(pduid)? | ||||
|                         .map(|b| { | ||||
|                             serde_json::from_slice::<PduEvent>(&b) | ||||
|                                 .map_err(|_| Error::bad_database("Invalid PDU in db.")) | ||||
|                         }) | ||||
|                         .ok_or_else(|| { | ||||
|                             Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                         })?, | ||||
|                 } | ||||
|             }) | ||||
|             .filter_map(|r| r.ok()) | ||||
|             .map(|pdu| { | ||||
|  | @ -137,12 +145,20 @@ impl Rooms { | |||
| 
 | ||||
|                     Ok::<_, Error>(Some(( | ||||
|                         pdu_id.clone().into(), | ||||
|                         serde_json::from_slice::<PduEvent>( | ||||
|                             &self.pduid_pdu.get(&pdu_id)?.ok_or_else(|| { | ||||
|                                 Error::bad_database("PDU in state not found in database.") | ||||
|                             })?, | ||||
|                         ) | ||||
|                         .map_err(|_| Error::bad_database("Invalid PDU bytes in room state."))?, | ||||
|                         match self.pduid_pdu.get(&pdu_id)? { | ||||
|                             Some(b) => serde_json::from_slice::<PduEvent>(&b) | ||||
|                                 .map_err(|_| Error::bad_database("Invalid PDU in db."))?, | ||||
|                             None => self | ||||
|                                 .pduid_outlierpdu | ||||
|                                 .get(pdu_id)? | ||||
|                                 .map(|b| { | ||||
|                                     serde_json::from_slice::<PduEvent>(&b) | ||||
|                                         .map_err(|_| Error::bad_database("Invalid PDU in db.")) | ||||
|                                 }) | ||||
|                                 .ok_or_else(|| { | ||||
|                                     Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                                 })??, | ||||
|                         }, | ||||
|                     ))) | ||||
|                 }) | ||||
|         } else { | ||||
|  | @ -307,9 +323,12 @@ impl Rooms { | |||
|             .get(event_id.as_bytes())? | ||||
|             .map_or(Ok(None), |pdu_id| { | ||||
|                 Ok(Some( | ||||
|                     serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| { | ||||
|                         Error::bad_database("eventid_pduid points to nonexistent pdu.") | ||||
|                     })?) | ||||
|                     serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { | ||||
|                         Some(b) => b, | ||||
|                         None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| { | ||||
|                             Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                         })?, | ||||
|                     }) | ||||
|                     .map_err(|_| Error::bad_database("Invalid PDU in db."))?, | ||||
|                 )) | ||||
|             }) | ||||
|  | @ -328,13 +347,17 @@ impl Rooms { | |||
|             .get(event_id.as_bytes())? | ||||
|             .map_or(Ok(None), |pdu_id| { | ||||
|                 Ok(Some( | ||||
|                     serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| { | ||||
|                         Error::bad_database("eventid_pduid points to nonexistent pdu.") | ||||
|                     })?) | ||||
|                     serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { | ||||
|                         Some(b) => b, | ||||
|                         None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| { | ||||
|                             Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                         })?, | ||||
|                     }) | ||||
|                     .map_err(|_| Error::bad_database("Invalid PDU in db."))?, | ||||
|                 )) | ||||
|             }) | ||||
|     } | ||||
| 
 | ||||
|     /// Returns the pdu.
 | ||||
|     pub fn get_pdu_from_id(&self, pdu_id: &IVec) -> Result<Option<PduEvent>> { | ||||
|         self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { | ||||
|  | @ -420,23 +443,27 @@ impl Rooms { | |||
| 
 | ||||
|     /// Returns the pdu from the outlier tree.
 | ||||
|     pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> { | ||||
|         self.eventid_outlierpdu | ||||
|             .get(event_id.as_bytes())? | ||||
|             .map_or(Ok(None), |pdu| { | ||||
|                 Ok(Some( | ||||
|                     serde_json::from_slice(&pdu) | ||||
|                         .map_err(|_| Error::bad_database("Invalid PDU in db."))?, | ||||
|                 )) | ||||
|         if let Some(id) = self.eventid_pduid.get(event_id.as_bytes())? { | ||||
|             self.pduid_outlierpdu.get(id)?.map_or(Ok(None), |pdu| { | ||||
|                 serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) | ||||
|             }) | ||||
|         } else { | ||||
|             Ok(None) | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Returns true if the event_id was previously inserted.
 | ||||
|     pub fn append_pdu_outlier(&self, event_id: &EventId, pdu: &PduEvent) -> Result<bool> { | ||||
|         log::info!("Number of outlier pdu's {}", self.eventid_outlierpdu.len()); | ||||
|     pub fn append_pdu_outlier(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<bool> { | ||||
|         log::info!("Number of outlier pdu's {}", self.pduid_outlierpdu.len()); | ||||
| 
 | ||||
|         // we need to be able to find it by event_id
 | ||||
|         self.eventid_pduid | ||||
|             .insert(pdu.event_id.as_bytes(), &*pdu_id)?; | ||||
| 
 | ||||
|         let res = self | ||||
|             .eventid_outlierpdu | ||||
|             .pduid_outlierpdu | ||||
|             .insert( | ||||
|                 event_id.as_bytes(), | ||||
|                 pdu_id, | ||||
|                 &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), | ||||
|             ) | ||||
|             .map(|op| op.is_some())?; | ||||
|  | @ -484,7 +511,9 @@ impl Rooms { | |||
|         } | ||||
| 
 | ||||
|         // We no longer keep this pdu as an outlier
 | ||||
|         self.eventid_outlierpdu.remove(pdu.event_id().as_bytes())?; | ||||
|         if let Some(id) = self.eventid_pduid.remove(pdu.event_id().as_bytes())? { | ||||
|             self.pduid_outlierpdu.remove(id)?; | ||||
|         } | ||||
| 
 | ||||
|         self.replace_pdu_leaves(&pdu.room_id, leaves)?; | ||||
| 
 | ||||
|  |  | |||
|  | @ -614,7 +614,7 @@ pub async fn send_transaction_message_route<'a>( | |||
|         // 7. if not timeline event: stop
 | ||||
|         // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 | ||||
|         // the events found in step 8 can be authed/resolved and appended to the DB
 | ||||
|         let (pdu, previous): (_, Vec<Arc<PduEvent>>) = match validate_event( | ||||
|         let (pdu, previous): (Arc<PduEvent>, Vec<Arc<PduEvent>>) = match validate_event( | ||||
|             &db, | ||||
|             value, | ||||
|             event_id.clone(), | ||||
|  | @ -638,69 +638,75 @@ pub async fn send_transaction_message_route<'a>( | |||
|             None | ||||
|         }; | ||||
| 
 | ||||
|         let count = db.globals.next_count()?; | ||||
|         let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|         pdu_id.push(0xff); | ||||
|         pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||
|         // 6. persist the event as an outlier.
 | ||||
|         db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?; | ||||
|         db.rooms.append_pdu_outlier(&pdu_id, &pdu)?; | ||||
| 
 | ||||
|         // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
 | ||||
|         // the checks in this list starting at 1. These are not timeline events.
 | ||||
|         //
 | ||||
|         // Step 10. check the auth of the event passes based on the calculated state of the event
 | ||||
|         let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) = | ||||
|             match db | ||||
|                 .sending | ||||
|                 .send_federation_request( | ||||
|                     &db.globals, | ||||
|         let (mut state_at_event, incoming_auth_events): ( | ||||
|             StateMap<Arc<PduEvent>>, | ||||
|             Vec<Arc<PduEvent>>, | ||||
|         ) = match db | ||||
|             .sending | ||||
|             .send_federation_request( | ||||
|                 &db.globals, | ||||
|                 server_name, | ||||
|                 get_room_state_ids::v1::Request { | ||||
|                     room_id: pdu.room_id(), | ||||
|                     event_id: pdu.event_id(), | ||||
|                 }, | ||||
|             ) | ||||
|             .await | ||||
|         { | ||||
|             Ok(res) => { | ||||
|                 let state = fetch_events( | ||||
|                     &db, | ||||
|                     server_name, | ||||
|                     get_room_state_ids::v1::Request { | ||||
|                         room_id: pdu.room_id(), | ||||
|                         event_id: pdu.event_id(), | ||||
|                     }, | ||||
|                     &pub_key_map, | ||||
|                     &res.pdu_ids, | ||||
|                     &mut auth_cache, | ||||
|                 ) | ||||
|                 .await | ||||
|             { | ||||
|                 Ok(res) => { | ||||
|                     let state = fetch_events( | ||||
|                 .await?; | ||||
|                 // Sanity check: there are no conflicting events in the state we received
 | ||||
|                 let mut seen = BTreeSet::new(); | ||||
|                 for ev in &state { | ||||
|                     // If the key is already present
 | ||||
|                     if !seen.insert((&ev.kind, &ev.state_key)) { | ||||
|                         todo!("Server sent us an invalid state") | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 let state = state | ||||
|                     .into_iter() | ||||
|                     .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) | ||||
|                     .collect(); | ||||
| 
 | ||||
|                 ( | ||||
|                     state, | ||||
|                     fetch_events( | ||||
|                         &db, | ||||
|                         server_name, | ||||
|                         &pub_key_map, | ||||
|                         &res.pdu_ids, | ||||
|                         &res.auth_chain_ids, | ||||
|                         &mut auth_cache, | ||||
|                     ) | ||||
|                     .await?; | ||||
|                     // Sanity check: there are no conflicting events in the state we received
 | ||||
|                     let mut seen = BTreeSet::new(); | ||||
|                     for ev in &state { | ||||
|                         // If the key is already present
 | ||||
|                         if !seen.insert((&ev.kind, &ev.state_key)) { | ||||
|                             todo!("Server sent us an invalid state") | ||||
|                         } | ||||
|                     } | ||||
| 
 | ||||
|                     let state = state | ||||
|                         .into_iter() | ||||
|                         .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) | ||||
|                         .collect(); | ||||
| 
 | ||||
|                     ( | ||||
|                         state, | ||||
|                         fetch_events( | ||||
|                             &db, | ||||
|                             server_name, | ||||
|                             &pub_key_map, | ||||
|                             &res.auth_chain_ids, | ||||
|                             &mut auth_cache, | ||||
|                         ) | ||||
|                         .await?, | ||||
|                     ) | ||||
|                 } | ||||
|                 Err(_) => { | ||||
|                     resolved_map.insert( | ||||
|                         pdu.event_id().clone(), | ||||
|                         Err("Fetching state for event failed".into()), | ||||
|                     ); | ||||
|                     continue; | ||||
|                 } | ||||
|             }; | ||||
|                     .await?, | ||||
|                 ) | ||||
|             } | ||||
|             Err(_) => { | ||||
|                 resolved_map.insert( | ||||
|                     pdu.event_id().clone(), | ||||
|                     Err("Fetching state for event failed".into()), | ||||
|                 ); | ||||
|                 continue; | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         // 10. This is the actual auth check for state at the event
 | ||||
|         if !state_res::event_auth::auth_check( | ||||
|  | @ -750,12 +756,25 @@ pub async fn send_transaction_message_route<'a>( | |||
|         //
 | ||||
|         // calculate_forward_extremities takes care of adding the current state if not already in the state sets
 | ||||
|         // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
 | ||||
|         let (mut fork_states, extremities) = match calculate_forward_extremities( | ||||
|         let extremities = match calculate_forward_extremities(&db, &pdu).await { | ||||
|             Ok(fork_ids) => fork_ids, | ||||
|             Err(_) => { | ||||
|                 resolved_map.insert(event_id, Err("Failed to gather forward extremities".into())); | ||||
|                 continue; | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         // Now that the event has passed all auth it is added into the timeline, we do have to
 | ||||
|         // find the leaves otherwise we would do this sooner
 | ||||
|         append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; | ||||
| 
 | ||||
|         let mut fork_states = match build_forward_extremity_snapshots( | ||||
|             &db, | ||||
|             &pdu, | ||||
|             pdu.room_id(), | ||||
|             server_name, | ||||
|             &pub_key_map, | ||||
|             current_state, | ||||
|             &extremities, | ||||
|             &pub_key_map, | ||||
|             &mut auth_cache, | ||||
|         ) | ||||
|         .await | ||||
|  | @ -767,6 +786,9 @@ pub async fn send_transaction_message_route<'a>( | |||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         // Make this the state after (since we appended_incoming_pdu this should agree with our servers
 | ||||
|         // current state).
 | ||||
|         state_at_event.insert((pdu.kind(), pdu.state_key()), pdu.clone()); | ||||
|         // add the incoming events to the mix of state snapshots
 | ||||
|         // Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets
 | ||||
|         fork_states.insert(state_at_event.clone()); | ||||
|  | @ -840,7 +862,7 @@ pub async fn send_transaction_message_route<'a>( | |||
|             ); | ||||
| 
 | ||||
|             let res = match state_res::StateResolution::resolve( | ||||
|                 &pdu.room_id, | ||||
|                 pdu.room_id(), | ||||
|                 &RoomVersionId::Version6, | ||||
|                 &fork_states | ||||
|                     .into_iter() | ||||
|  | @ -865,6 +887,7 @@ pub async fn send_transaction_message_route<'a>( | |||
|                     continue 'main_pdu_loop; | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             let mut resolved = BTreeMap::new(); | ||||
|             for (k, id) in res { | ||||
|                 // We should know of the event but just incase
 | ||||
|  | @ -890,10 +913,9 @@ pub async fn send_transaction_message_route<'a>( | |||
|         }; | ||||
| 
 | ||||
|         // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
 | ||||
|         append_incoming_pdu( | ||||
|         update_resolved_state( | ||||
|             &db, | ||||
|             &pdu, | ||||
|             &extremities, | ||||
|             pdu.room_id(), | ||||
|             if update_state { | ||||
|                 Some(state_at_forks) | ||||
|             } else { | ||||
|  | @ -905,7 +927,10 @@ pub async fn send_transaction_message_route<'a>( | |||
|         resolved_map.insert(pdu.event_id().clone(), Ok(())); | ||||
|     } | ||||
| 
 | ||||
|     Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) | ||||
|     Ok(send_transaction_message::v1::Response { | ||||
|         pdus: dbg!(resolved_map), | ||||
|     } | ||||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| /// An async function that can recursively calls itself.
 | ||||
|  | @ -1036,13 +1061,14 @@ async fn fetch_check_auth_events( | |||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| /// Find the event and auth it.
 | ||||
| /// Find the event and auth it. Once the event is validated (steps 1 - 8)
 | ||||
| /// it is appended to the outliers Tree.
 | ||||
| ///
 | ||||
| /// 1. Look in the main timeline (pduid_pdu tree)
 | ||||
| /// 2. Look at outlier pdu tree
 | ||||
| /// 3. Ask origin server over federation
 | ||||
| /// 4. TODO: Ask other servers over federation?
 | ||||
| async fn fetch_events( | ||||
| pub(crate) async fn fetch_events( | ||||
|     db: &Database, | ||||
|     origin: &ServerName, | ||||
|     key_map: &PublicKeyMap, | ||||
|  | @ -1071,6 +1097,13 @@ async fn fetch_events( | |||
|                                 .await | ||||
|                                 .map_err(|_| Error::Conflict("Authentication of event failed"))?; | ||||
| 
 | ||||
|                         // create the pduid for this event but stick it in the outliers DB
 | ||||
|                         let count = db.globals.next_count()?; | ||||
|                         let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|                         pdu_id.push(0xff); | ||||
|                         pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||
| 
 | ||||
|                         db.rooms.append_pdu_outlier(&pdu_id, &pdu)?; | ||||
|                         pdu | ||||
|                     } | ||||
|                     Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), | ||||
|  | @ -1084,7 +1117,7 @@ async fn fetch_events( | |||
| 
 | ||||
| /// Search the DB for the signing keys of the given server, if we don't have them
 | ||||
| /// fetch them from the server and save to our DB.
 | ||||
| async fn fetch_signing_keys( | ||||
| pub(crate) async fn fetch_signing_keys( | ||||
|     db: &Database, | ||||
|     origin: &ServerName, | ||||
| ) -> Result<BTreeMap<ServerSigningKeyId, VerifyKey>> { | ||||
|  | @ -1108,26 +1141,28 @@ async fn fetch_signing_keys( | |||
| /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
 | ||||
| ///
 | ||||
| /// The state snapshot of the incoming event __needs__ to be added to the resulting list.
 | ||||
| async fn calculate_forward_extremities( | ||||
| pub(crate) async fn calculate_forward_extremities( | ||||
|     db: &Database, | ||||
|     pdu: &PduEvent, | ||||
|     origin: &ServerName, | ||||
|     pub_key_map: &PublicKeyMap, | ||||
|     current_state: BTreeMap<(EventType, Option<String>), Arc<PduEvent>>, | ||||
|     auth_cache: &mut EventMap<Arc<PduEvent>>, | ||||
| ) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> { | ||||
| ) -> Result<Vec<EventId>> { | ||||
|     let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; | ||||
| 
 | ||||
|     let mut is_incoming_leaf = true; | ||||
|     // Make sure the incoming event is not already a forward extremity
 | ||||
|     // FIXME: I think this could happen if different servers send us the same event??
 | ||||
|     //
 | ||||
|     if current_leaves.contains(pdu.event_id()) { | ||||
|         is_incoming_leaf = false; | ||||
|         // Not sure what to do here
 | ||||
|     } | ||||
| 
 | ||||
|     // If the incoming event is already referenced by an existing event
 | ||||
|     // then do nothing - it's not a candidate to be a new extremity if
 | ||||
|     // it has been referenced.
 | ||||
|     if current_leaves.contains(pdu.event_id()) || db.rooms.get_pdu_id(pdu.event_id())?.is_some() { | ||||
|         is_incoming_leaf = false; | ||||
|         // Not sure what to do here
 | ||||
|     //
 | ||||
|     // We first check if know of the event and then don't include it as a forward
 | ||||
|     // extremity if it is a timeline event
 | ||||
|     if db.rooms.get_pdu_id(pdu.event_id())?.is_some() { | ||||
|         is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some(); | ||||
|     } | ||||
| 
 | ||||
|     // TODO:
 | ||||
|  | @ -1144,11 +1179,34 @@ async fn calculate_forward_extremities( | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let current_hash = db.rooms.current_state_hash(pdu.room_id())?; | ||||
|     // Add the incoming event only if it is a leaf, we do this after fetching all the
 | ||||
|     // state since we know we have already fetched the state of the incoming event so lets
 | ||||
|     // not do it again!
 | ||||
|     if is_incoming_leaf { | ||||
|         current_leaves.push(pdu.event_id().clone()); | ||||
|     } | ||||
| 
 | ||||
|     Ok(current_leaves) | ||||
| } | ||||
| 
 | ||||
| /// This should always be called after the incoming event has been appended to the DB.
 | ||||
| ///
 | ||||
| /// This guarentees that the incoming event will be in the state sets (at least our servers
 | ||||
| /// and the sending server).
 | ||||
| pub(crate) async fn build_forward_extremity_snapshots( | ||||
|     db: &Database, | ||||
|     room_id: &RoomId, | ||||
|     origin: &ServerName, | ||||
|     current_state: StateMap<Arc<PduEvent>>, | ||||
|     current_leaves: &[EventId], | ||||
|     pub_key_map: &PublicKeyMap, | ||||
|     auth_cache: &mut EventMap<Arc<PduEvent>>, | ||||
| ) -> Result<BTreeSet<StateMap<Arc<PduEvent>>>> { | ||||
|     let current_hash = db.rooms.current_state_hash(room_id)?; | ||||
| 
 | ||||
|     let mut includes_current_state = false; | ||||
|     let mut fork_states = BTreeSet::new(); | ||||
|     for id in ¤t_leaves { | ||||
|     for id in current_leaves { | ||||
|         if let Some(id) = db.rooms.get_pdu_id(id)? { | ||||
|             let state_hash = db | ||||
|                 .rooms | ||||
|  | @ -1158,14 +1216,21 @@ async fn calculate_forward_extremities( | |||
|             if current_hash.as_ref() == Some(&state_hash) { | ||||
|                 includes_current_state = true; | ||||
|             } | ||||
|             let state = db | ||||
| 
 | ||||
|             let mut state_before = db | ||||
|                 .rooms | ||||
|                 .state_full(&pdu.room_id, &state_hash)? | ||||
|                 .state_full(room_id, &state_hash)? | ||||
|                 .into_iter() | ||||
|                 .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) | ||||
|                 .collect(); | ||||
|                 .collect::<StateMap<_>>(); | ||||
| 
 | ||||
|             fork_states.insert(state); | ||||
|             // Now it's the state after
 | ||||
|             if let Some(pdu) = db.rooms.get_pdu_from_id(&id)? { | ||||
|                 let key = (pdu.kind.clone(), pdu.state_key()); | ||||
|                 state_before.insert(key, Arc::new(pdu)); | ||||
|             } | ||||
| 
 | ||||
|             fork_states.insert(state_before); | ||||
|         } else { | ||||
|             let res = db | ||||
|                 .sending | ||||
|  | @ -1173,7 +1238,7 @@ async fn calculate_forward_extremities( | |||
|                     &db.globals, | ||||
|                     origin, | ||||
|                     get_room_state_ids::v1::Request { | ||||
|                         room_id: pdu.room_id(), | ||||
|                         room_id, | ||||
|                         event_id: id, | ||||
|                     }, | ||||
|                 ) | ||||
|  | @ -1181,41 +1246,38 @@ async fn calculate_forward_extremities( | |||
| 
 | ||||
|             // TODO: This only adds events to the auth_cache, there is for sure a better way to
 | ||||
|             // do this...
 | ||||
|             fetch_events(&db, origin, &pub_key_map, &res.auth_chain_ids, auth_cache).await?; | ||||
|             fetch_events(&db, origin, pub_key_map, &res.auth_chain_ids, auth_cache).await?; | ||||
| 
 | ||||
|             let state = fetch_events(&db, origin, &pub_key_map, &res.pdu_ids, auth_cache) | ||||
|             let mut state_before = fetch_events(&db, origin, pub_key_map, &res.pdu_ids, auth_cache) | ||||
|                 .await? | ||||
|                 .into_iter() | ||||
|                 .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) | ||||
|                 .collect(); | ||||
|                 .collect::<StateMap<_>>(); | ||||
| 
 | ||||
|             fork_states.insert(state); | ||||
|             if let Some(pdu) = fetch_events(db, origin, pub_key_map, &[id.clone()], auth_cache) | ||||
|                 .await? | ||||
|                 .pop() | ||||
|             { | ||||
|                 let key = (pdu.kind.clone(), pdu.state_key()); | ||||
|                 state_before.insert(key, pdu); | ||||
|             } | ||||
| 
 | ||||
|             // Now it's the state after
 | ||||
|             fork_states.insert(state_before); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // Add the incoming event only if it is a leaf, we do this after fetching all the
 | ||||
|     // state since we know we have already fetched the state of the incoming event so lets
 | ||||
|     // not do it again!
 | ||||
|     if is_incoming_leaf { | ||||
|         current_leaves.push(pdu.event_id().clone()); | ||||
|     } | ||||
| 
 | ||||
|     // This guarantees that our current room state is included
 | ||||
|     if !includes_current_state && current_hash.is_some() { | ||||
|         fork_states.insert(current_state); | ||||
|     } | ||||
| 
 | ||||
|     Ok((fork_states, current_leaves)) | ||||
|     Ok(fork_states) | ||||
| } | ||||
| 
 | ||||
| /// Update the room state to be the resolved state and add the fully auth'ed event
 | ||||
| /// to the DB.
 | ||||
| ///
 | ||||
| /// TODO: Since all these events passed state resolution can we trust them to add
 | ||||
| fn append_incoming_pdu( | ||||
| pub(crate) fn update_resolved_state( | ||||
|     db: &Database, | ||||
|     pdu: &PduEvent, | ||||
|     new_room_leaves: &[EventId], | ||||
|     room_id: &RoomId, | ||||
|     state: Option<StateMap<Arc<PduEvent>>>, | ||||
| ) -> Result<()> { | ||||
|     // Update the state of the room if needed
 | ||||
|  | @ -1236,44 +1298,50 @@ fn append_incoming_pdu( | |||
|                     ); | ||||
|                 } | ||||
|                 None => { | ||||
|                     let count = db.globals.next_count()?; | ||||
|                     let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|                     pdu_id.push(0xff); | ||||
|                     pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||
| 
 | ||||
|                     // TODO: can we use are current state if we just add this event to the end of our
 | ||||
|                     // pduid_pdu tree??
 | ||||
|                     let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||
| 
 | ||||
|                     db.rooms.append_pdu( | ||||
|                         &*pdu, | ||||
|                         utils::to_canonical_object(&*pdu).expect("Pdu is valid canonical object"), | ||||
|                         count, | ||||
|                         pdu_id.clone().into(), | ||||
|                         &new_room_leaves, | ||||
|                         &db, | ||||
|                     )?; | ||||
|                     // TODO: is this ok...
 | ||||
|                     db.rooms.set_room_state(&pdu.room_id, &statehashid)?; | ||||
|                     new_state.insert( | ||||
|                         ( | ||||
|                             ev_type, | ||||
|                             state_k.ok_or_else(|| { | ||||
|                                 Error::Conflict("State contained non state event") | ||||
|                             })?, | ||||
|                         ), | ||||
|                         pdu_id.to_vec(), | ||||
|                     ); | ||||
|                     error!("We didn't append an event as an outlier\n{:?}", pdu); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         info!("Force update of state for {:?}", pdu); | ||||
| 
 | ||||
|         db.rooms | ||||
|             .force_state(pdu.room_id(), new_state, &db.globals)?; | ||||
|         db.rooms.force_state(room_id, new_state, &db.globals)?; | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| /// Append the incoming event setting the state snapshot to the state from the
 | ||||
| /// server that sent the event.
 | ||||
| pub(crate) fn append_incoming_pdu( | ||||
|     db: &Database, | ||||
|     pdu: &PduEvent, | ||||
|     new_room_leaves: &[EventId], | ||||
|     state: &StateMap<Arc<PduEvent>>, | ||||
| ) -> Result<()> { | ||||
|     // Update the state of the room if needed
 | ||||
|     // We can tell if we need to do this based on wether state resolution took place or not
 | ||||
|     let mut new_state = HashMap::new(); | ||||
|     for ((ev_type, state_k), pdu) in state { | ||||
|         match db.rooms.get_pdu_id(pdu.event_id())? { | ||||
|             Some(pduid) => { | ||||
|                 new_state.insert( | ||||
|                     ( | ||||
|                         ev_type.clone(), | ||||
|                         state_k | ||||
|                             .clone() | ||||
|                             .ok_or_else(|| Error::Conflict("State contained non state event"))?, | ||||
|                     ), | ||||
|                     pduid.to_vec(), | ||||
|                 ); | ||||
|             } | ||||
|             None => { | ||||
|                 error!("We didn't append an event as an outlier\n{:?}", pdu); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     db.rooms | ||||
|         .force_state(pdu.room_id(), new_state, &db.globals)?; | ||||
| 
 | ||||
|     let count = db.globals.next_count()?; | ||||
|     let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|     pdu_id.push(0xff); | ||||
|  | @ -1281,7 +1349,7 @@ fn append_incoming_pdu( | |||
| 
 | ||||
|     // We append to state before appending the pdu, so we don't have a moment in time with the
 | ||||
|     // pdu without it's state. This is okay because append_pdu can't fail.
 | ||||
|     let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||
|     let state_hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||
| 
 | ||||
|     db.rooms.append_pdu( | ||||
|         pdu, | ||||
|  | @ -1292,9 +1360,7 @@ fn append_incoming_pdu( | |||
|         &db, | ||||
|     )?; | ||||
| 
 | ||||
|     // We set the room state after inserting the pdu, so that we never have a moment in time
 | ||||
|     // where events in the current room state do not exist
 | ||||
|     db.rooms.set_room_state(&pdu.room_id, &statehashid)?; | ||||
|     db.rooms.set_room_state(pdu.room_id(), &state_hash)?; | ||||
| 
 | ||||
|     for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { | ||||
|         db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue