Use eventId when saving outliers
This commit is contained in:
		
							parent
							
								
									e09be2f7ee
								
							
						
					
					
						commit
						64374b4679
					
				
					 4 changed files with 84 additions and 65 deletions
				
			
		|  | @ -701,7 +701,7 @@ pub async fn sync_events_route( | |||
|         let delay = tokio::time::sleep(duration); | ||||
|         tokio::pin!(delay); | ||||
|         tokio::select! { | ||||
|             _ = &mut delay, if delay.is_elapsed() => {} | ||||
|             _ = &mut delay => {} | ||||
|             _ = watcher => {} | ||||
|         } | ||||
|     } | ||||
|  |  | |||
|  | @ -159,7 +159,7 @@ impl Database { | |||
|                 stateid_pduid: db.open_tree("stateid_pduid")?, | ||||
|                 pduid_statehash: db.open_tree("pduid_statehash")?, | ||||
|                 roomid_statehash: db.open_tree("roomid_statehash")?, | ||||
|                 pduid_outlierpdu: db.open_tree("pduid_outlierpdu")?, | ||||
|                 eventid_outlierpdu: db.open_tree("eventid_outlierpdu")?, | ||||
|             }, | ||||
|             account_data: account_data::AccountData { | ||||
|                 roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, | ||||
|  |  | |||
|  | @ -27,10 +27,9 @@ use std::{ | |||
|     convert::{TryFrom, TryInto}, | ||||
|     mem, | ||||
|     sync::Arc, | ||||
|     time::Duration, | ||||
| }; | ||||
| 
 | ||||
| use super::{admin::AdminCommand, sending::Sending}; | ||||
| use super::admin::AdminCommand; | ||||
| 
 | ||||
| /// The unique identifier of each state group.
 | ||||
| ///
 | ||||
|  | @ -67,13 +66,16 @@ pub struct Rooms { | |||
|     pub(super) statekey_short: sled::Tree, // StateKey = EventType + StateKey, Short = Count
 | ||||
|     pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid)
 | ||||
| 
 | ||||
|     /// Any pdu that has passed the steps up to auth with auth_events.
 | ||||
|     pub(super) pduid_outlierpdu: sled::Tree, | ||||
|     /// RoomId + EventId -> outlier PDU.
 | ||||
|     /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
 | ||||
|     pub(super) eventid_outlierpdu: sled::Tree, | ||||
| } | ||||
| 
 | ||||
| impl Rooms { | ||||
|     /// Builds a StateMap by iterating over all keys that start
 | ||||
|     /// with state_hash, this gives the full state for the given state_hash.
 | ||||
|     ///
 | ||||
|     /// TODO: Should this check for outliers, it does now.
 | ||||
|     pub fn state_full( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|  | @ -90,7 +92,7 @@ impl Rooms { | |||
|                     Some(b) => serde_json::from_slice::<PduEvent>(&b) | ||||
|                         .map_err(|_| Error::bad_database("Invalid PDU in db.")), | ||||
|                     None => self | ||||
|                         .pduid_outlierpdu | ||||
|                         .eventid_outlierpdu | ||||
|                         .get(pduid)? | ||||
|                         .map(|b| { | ||||
|                             serde_json::from_slice::<PduEvent>(&b) | ||||
|  | @ -118,6 +120,8 @@ impl Rooms { | |||
|     } | ||||
| 
 | ||||
|     /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
 | ||||
|     ///
 | ||||
|     /// TODO: Should this check for outliers, it does now.
 | ||||
|     pub fn state_get( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|  | @ -149,7 +153,7 @@ impl Rooms { | |||
|                             Some(b) => serde_json::from_slice::<PduEvent>(&b) | ||||
|                                 .map_err(|_| Error::bad_database("Invalid PDU in db."))?, | ||||
|                             None => self | ||||
|                                 .pduid_outlierpdu | ||||
|                                 .eventid_outlierpdu | ||||
|                                 .get(pdu_id)? | ||||
|                                 .map(|b| { | ||||
|                                     serde_json::from_slice::<PduEvent>(&b) | ||||
|  | @ -260,6 +264,8 @@ impl Rooms { | |||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             // Because of outliers this could also be an eventID but that
 | ||||
|             // is handled by `state_full`
 | ||||
|             let pdu_id_short = pdu_id | ||||
|                 .splitn(2, |&b| b == 0xff) | ||||
|                 .nth(1) | ||||
|  | @ -325,9 +331,12 @@ impl Rooms { | |||
|                 Ok(Some( | ||||
|                     serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { | ||||
|                         Some(b) => b, | ||||
|                         None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| { | ||||
|                             Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                         })?, | ||||
|                         None => self | ||||
|                             .eventid_outlierpdu | ||||
|                             .get(event_id.as_bytes())? | ||||
|                             .ok_or_else(|| { | ||||
|                                 Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                             })?, | ||||
|                     }) | ||||
|                     .map_err(|_| Error::bad_database("Invalid PDU in db."))?, | ||||
|                 )) | ||||
|  | @ -342,6 +351,8 @@ impl Rooms { | |||
|     } | ||||
| 
 | ||||
|     /// Returns the pdu.
 | ||||
|     ///
 | ||||
|     /// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
 | ||||
|     pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> { | ||||
|         self.eventid_pduid | ||||
|             .get(event_id.as_bytes())? | ||||
|  | @ -349,9 +360,12 @@ impl Rooms { | |||
|                 Ok(Some( | ||||
|                     serde_json::from_slice(&match self.pduid_pdu.get(&pdu_id)? { | ||||
|                         Some(b) => b, | ||||
|                         None => self.pduid_outlierpdu.get(pdu_id)?.ok_or_else(|| { | ||||
|                             Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                         })?, | ||||
|                         None => self | ||||
|                             .eventid_outlierpdu | ||||
|                             .get(event_id.as_bytes())? | ||||
|                             .ok_or_else(|| { | ||||
|                                 Error::bad_database("Event is not in pdu tree or outliers.") | ||||
|                             })?, | ||||
|                     }) | ||||
|                     .map_err(|_| Error::bad_database("Invalid PDU in db."))?, | ||||
|                 )) | ||||
|  | @ -443,27 +457,34 @@ impl Rooms { | |||
| 
 | ||||
|     /// Returns the pdu from the outlier tree.
 | ||||
|     pub fn get_pdu_outlier(&self, event_id: &EventId) -> Result<Option<PduEvent>> { | ||||
|         if let Some(id) = self.eventid_pduid.get(event_id.as_bytes())? { | ||||
|             self.pduid_outlierpdu.get(id)?.map_or(Ok(None), |pdu| { | ||||
|         self.eventid_outlierpdu | ||||
|             .get(event_id.as_bytes())? | ||||
|             .map_or(Ok(None), |pdu| { | ||||
|                 serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db.")) | ||||
|             }) | ||||
|         } else { | ||||
|             Ok(None) | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Returns true if the event_id was previously inserted.
 | ||||
|     pub fn append_pdu_outlier(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<bool> { | ||||
|         log::info!("Number of outlier pdu's {}", self.pduid_outlierpdu.len()); | ||||
|     pub fn append_pdu_outlier(&self, pdu: &PduEvent) -> Result<bool> { | ||||
|         log::info!( | ||||
|             "Number of outlier pdu's {:#?}", | ||||
|             self.eventid_outlierpdu | ||||
|                 .iter() | ||||
|                 .map(|pair| { | ||||
|                     let (_k, v) = pair.unwrap(); | ||||
|                     serde_json::from_slice::<PduBuilder>(&v).unwrap() | ||||
|                 }) | ||||
|                 .collect::<Vec<_>>() | ||||
|         ); | ||||
| 
 | ||||
|         // we need to be able to find it by event_id
 | ||||
|         self.eventid_pduid | ||||
|             .insert(pdu.event_id.as_bytes(), &*pdu_id)?; | ||||
|         let mut key = pdu.room_id().as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(pdu.event_id().as_bytes()); | ||||
| 
 | ||||
|         let res = self | ||||
|             .pduid_outlierpdu | ||||
|             .eventid_outlierpdu | ||||
|             .insert( | ||||
|                 pdu_id, | ||||
|                 &key, | ||||
|                 &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), | ||||
|             ) | ||||
|             .map(|op| op.is_some())?; | ||||
|  | @ -511,9 +532,10 @@ impl Rooms { | |||
|         } | ||||
| 
 | ||||
|         // We no longer keep this pdu as an outlier
 | ||||
|         if let Some(id) = self.eventid_pduid.remove(pdu.event_id().as_bytes())? { | ||||
|             self.pduid_outlierpdu.remove(id)?; | ||||
|         } | ||||
|         let mut key = pdu.room_id().as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(pdu.event_id().as_bytes()); | ||||
|         self.eventid_outlierpdu.remove(key)?; | ||||
| 
 | ||||
|         self.replace_pdu_leaves(&pdu.room_id, leaves)?; | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,7 +18,6 @@ use ruma::{ | |||
|         OutgoingRequest, | ||||
|     }, | ||||
|     directory::{IncomingFilter, IncomingRoomNetwork}, | ||||
|     events::EventType, | ||||
|     serde::to_canonical_value, | ||||
|     signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, | ||||
|     EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, | ||||
|  | @ -638,12 +637,8 @@ pub async fn send_transaction_message_route<'a>( | |||
|             None | ||||
|         }; | ||||
| 
 | ||||
|         let count = db.globals.next_count()?; | ||||
|         let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|         pdu_id.push(0xff); | ||||
|         pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||
|         // 6. persist the event as an outlier.
 | ||||
|         db.rooms.append_pdu_outlier(&pdu_id, &pdu)?; | ||||
|         db.rooms.append_pdu_outlier(&pdu)?; | ||||
| 
 | ||||
|         // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
 | ||||
|         // the checks in this list starting at 1. These are not timeline events.
 | ||||
|  | @ -1079,37 +1074,28 @@ pub(crate) async fn fetch_events( | |||
| ) -> Result<Vec<Arc<PduEvent>>> { | ||||
|     let mut pdus = vec![]; | ||||
|     for id in events { | ||||
|         // `get_pdu` checks the outliers tree for us
 | ||||
|         let pdu = match db.rooms.get_pdu(&id)? { | ||||
|             Some(pdu) => Arc::new(pdu), | ||||
|             None => match db.rooms.get_pdu_outlier(&id)? { | ||||
|                 Some(pdu) => Arc::new(pdu), | ||||
|                 None => match db | ||||
|                     .sending | ||||
|                     .send_federation_request( | ||||
|                         &db.globals, | ||||
|                         origin, | ||||
|                         get_event::v1::Request { event_id: &id }, | ||||
|                     ) | ||||
|                     .await | ||||
|                 { | ||||
|                     Ok(res) => { | ||||
|                         let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu); | ||||
|                         let (pdu, _) = | ||||
|                             validate_event(db, value, event_id, key_map, origin, auth_cache) | ||||
|                                 .await | ||||
|                                 .map_err(|_| Error::Conflict("Authentication of event failed"))?; | ||||
|             None => match db | ||||
|                 .sending | ||||
|                 .send_federation_request( | ||||
|                     &db.globals, | ||||
|                     origin, | ||||
|                     get_event::v1::Request { event_id: &id }, | ||||
|                 ) | ||||
|                 .await | ||||
|             { | ||||
|                 Ok(res) => { | ||||
|                     let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu); | ||||
|                     let (pdu, _) = validate_event(db, value, event_id, key_map, origin, auth_cache) | ||||
|                         .await | ||||
|                         .map_err(|_| Error::Conflict("Authentication of event failed"))?; | ||||
| 
 | ||||
|                         // create the pduid for this event but stick it in the outliers DB
 | ||||
|                         let count = db.globals.next_count()?; | ||||
|                         let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|                         pdu_id.push(0xff); | ||||
|                         pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||
| 
 | ||||
|                         db.rooms.append_pdu_outlier(&pdu_id, &pdu)?; | ||||
|                         pdu | ||||
|                     } | ||||
|                     Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), | ||||
|                 }, | ||||
|                     db.rooms.append_pdu_outlier(&pdu)?; | ||||
|                     pdu | ||||
|                 } | ||||
|                 Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), | ||||
|             }, | ||||
|         }; | ||||
|         pdus.push(pdu); | ||||
|  | @ -1193,7 +1179,7 @@ pub(crate) async fn calculate_forward_extremities( | |||
| 
 | ||||
| /// This should always be called after the incoming event has been appended to the DB.
 | ||||
| ///
 | ||||
| /// This guarentees that the incoming event will be in the state sets (at least our servers
 | ||||
| /// This guarantees that the incoming event will be in the state sets (at least our servers
 | ||||
| /// and the sending server).
 | ||||
| pub(crate) async fn build_forward_extremity_snapshots( | ||||
|     db: &Database, | ||||
|  | @ -1303,7 +1289,18 @@ pub(crate) fn update_resolved_state( | |||
|                     ); | ||||
|                 } | ||||
|                 None => { | ||||
|                     error!("We didn't append an event as an outlier\n{:?}", pdu); | ||||
|                     let mut pduid = pdu.room_id().as_bytes().to_vec(); | ||||
|                     pduid.push(0xff); | ||||
|                     pduid.extend_from_slice(pdu.event_id().as_bytes()); | ||||
|                     new_state.insert( | ||||
|                         ( | ||||
|                             ev_type, | ||||
|                             state_k.ok_or_else(|| { | ||||
|                                 Error::Conflict("State contained non state event") | ||||
|                             })?, | ||||
|                         ), | ||||
|                         pduid, | ||||
|                     ); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue