Merge branch 'fetchprev' into 'master'
improvement: try to load missing prev events See merge request famedly/conduit!152
This commit is contained in:
		
						commit
						610b4f9ad1
					
				
					 2 changed files with 71 additions and 27 deletions
				
			
		|  | @ -280,6 +280,24 @@ impl Rooms { | ||||||
|             .is_some()) |             .is_some()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Checks if a room exists.
 | ||||||
|  |     pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> { | ||||||
|  |         let mut prefix = room_id.as_bytes().to_vec(); | ||||||
|  |         prefix.push(0xff); | ||||||
|  | 
 | ||||||
|  |         // Look for PDUs in that room.
 | ||||||
|  |         self.pduid_pdu | ||||||
|  |             .iter_from(&prefix, false) | ||||||
|  |             .filter(|(k, _)| k.starts_with(&prefix)) | ||||||
|  |             .map(|(_, pdu)| { | ||||||
|  |                 serde_json::from_slice(&pdu) | ||||||
|  |                     .map_err(|_| Error::bad_database("Invalid first PDU in db.")) | ||||||
|  |                     .map(Arc::new) | ||||||
|  |             }) | ||||||
|  |             .next() | ||||||
|  |             .transpose() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Force the creation of a new StateHash and insert it into the db.
 |     /// Force the creation of a new StateHash and insert it into the db.
 | ||||||
|     ///
 |     ///
 | ||||||
|     /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.
 |     /// Whatever `state` is supplied to `force_state` __is__ the current room state snapshot.
 | ||||||
|  |  | ||||||
|  | @ -272,13 +272,14 @@ where | ||||||
|             if status == 200 { |             if status == 200 { | ||||||
|                 let response = T::IncomingResponse::try_from_http_response(http_response); |                 let response = T::IncomingResponse::try_from_http_response(http_response); | ||||||
|                 response.map_err(|e| { |                 response.map_err(|e| { | ||||||
|                     warn!("Invalid 200 response: {}", e); |                     warn!("Invalid 200 response from {}: {}", &destination, e); | ||||||
|                     Error::BadServerResponse("Server returned bad 200 response.") |                     Error::BadServerResponse("Server returned bad 200 response.") | ||||||
|                 }) |                 }) | ||||||
|             } else { |             } else { | ||||||
|                 Err(Error::FederationError( |                 Err(Error::FederationError( | ||||||
|                     destination.to_owned(), |                     destination.to_owned(), | ||||||
|                     RumaError::try_from_http_response(http_response).map_err(|_| { |                     RumaError::try_from_http_response(http_response).map_err(|e| { | ||||||
|  |                         warn!("Server returned bad error response: {}", e); | ||||||
|                         Error::BadServerResponse("Server returned bad error response.") |                         Error::BadServerResponse("Server returned bad error response.") | ||||||
|                     })?, |                     })?, | ||||||
|                 )) |                 )) | ||||||
|  | @ -811,7 +812,7 @@ pub async fn send_transaction_message_route( | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// An async function that can recursively call itself.
 | /// An async function that can recursively call itself.
 | ||||||
| type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E>> + 'a + Send>>; | type AsyncRecursiveType<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>; | ||||||
| 
 | 
 | ||||||
| /// When receiving an event one needs to:
 | /// When receiving an event one needs to:
 | ||||||
| /// 0. Check the server is in the room
 | /// 0. Check the server is in the room
 | ||||||
|  | @ -836,7 +837,7 @@ type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E | ||||||
| /// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
 | /// 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail"
 | ||||||
| ///     it
 | ///     it
 | ||||||
| /// 14. Use state resolution to find new room state
 | /// 14. Use state resolution to find new room state
 | ||||||
| // We use some AsyncRecursiveResult hacks here so we can call this async funtion recursively
 | // We use some AsyncRecursiveType hacks here so we can call this async funtion recursively
 | ||||||
| #[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] | #[tracing::instrument(skip(value, is_timeline_event, db, pub_key_map))] | ||||||
| pub fn handle_incoming_pdu<'a>( | pub fn handle_incoming_pdu<'a>( | ||||||
|     origin: &'a ServerName, |     origin: &'a ServerName, | ||||||
|  | @ -846,7 +847,7 @@ pub fn handle_incoming_pdu<'a>( | ||||||
|     is_timeline_event: bool, |     is_timeline_event: bool, | ||||||
|     db: &'a Database, |     db: &'a Database, | ||||||
|     pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |     pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, | ||||||
| ) -> AsyncRecursiveResult<'a, Option<Vec<u8>>, String> { | ) -> AsyncRecursiveType<'a, StdResult<Option<Vec<u8>>, String>> { | ||||||
|     Box::pin(async move { |     Box::pin(async move { | ||||||
|         // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
 |         // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
 | ||||||
|         match db.rooms.exists(&room_id) { |         match db.rooms.exists(&room_id) { | ||||||
|  | @ -920,9 +921,15 @@ pub fn handle_incoming_pdu<'a>( | ||||||
|         // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
 |         // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
 | ||||||
|         // EDIT: Step 5 is not applied anymore because it failed too often
 |         // EDIT: Step 5 is not applied anymore because it failed too often
 | ||||||
|         debug!("Fetching auth events for {}", incoming_pdu.event_id); |         debug!("Fetching auth events for {}", incoming_pdu.event_id); | ||||||
|         fetch_and_handle_events(db, origin, &incoming_pdu.auth_events, &room_id, pub_key_map) |         fetch_and_handle_events( | ||||||
|             .await |             db, | ||||||
|             .map_err(|e| e.to_string())?; |             origin, | ||||||
|  |             &incoming_pdu.auth_events, | ||||||
|  |             &room_id, | ||||||
|  |             pub_key_map, | ||||||
|  |             false, | ||||||
|  |         ) | ||||||
|  |         .await; | ||||||
| 
 | 
 | ||||||
|         // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
 |         // 6. Reject "due to auth events" if the event doesn't pass auth based on the auth events
 | ||||||
|         debug!( |         debug!( | ||||||
|  | @ -1004,10 +1011,28 @@ pub fn handle_incoming_pdu<'a>( | ||||||
|         debug!("Added pdu as outlier."); |         debug!("Added pdu as outlier."); | ||||||
| 
 | 
 | ||||||
|         // 8. if not timeline event: stop
 |         // 8. if not timeline event: stop
 | ||||||
|         if !is_timeline_event { |         if !is_timeline_event | ||||||
|  |             || incoming_pdu.origin_server_ts | ||||||
|  |                 < db.rooms | ||||||
|  |                     .first_pdu_in_room(&room_id) | ||||||
|  |                     .map_err(|_| "Error loading first room event.".to_owned())? | ||||||
|  |                     .expect("Room exists") | ||||||
|  |                     .origin_server_ts | ||||||
|  |         { | ||||||
|             return Ok(None); |             return Ok(None); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  |         // Load missing prev events first
 | ||||||
|  |         fetch_and_handle_events( | ||||||
|  |             db, | ||||||
|  |             origin, | ||||||
|  |             &incoming_pdu.prev_events, | ||||||
|  |             &room_id, | ||||||
|  |             pub_key_map, | ||||||
|  |             true, | ||||||
|  |         ) | ||||||
|  |         .await; | ||||||
|  | 
 | ||||||
|         // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 |         // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 | ||||||
| 
 | 
 | ||||||
|         // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
 |         // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
 | ||||||
|  | @ -1034,9 +1059,9 @@ pub fn handle_incoming_pdu<'a>( | ||||||
|                     &state.into_iter().collect::<Vec<_>>(), |                     &state.into_iter().collect::<Vec<_>>(), | ||||||
|                     &room_id, |                     &room_id, | ||||||
|                     pub_key_map, |                     pub_key_map, | ||||||
|  |                     false, | ||||||
|                 ) |                 ) | ||||||
|                 .await |                 .await | ||||||
|                 .map_err(|_| "Failed to fetch state events locally".to_owned())? |  | ||||||
|                 .into_iter() |                 .into_iter() | ||||||
|                 .map(|pdu| { |                 .map(|pdu| { | ||||||
|                     ( |                     ( | ||||||
|  | @ -1081,18 +1106,15 @@ pub fn handle_incoming_pdu<'a>( | ||||||
|             { |             { | ||||||
|                 Ok(res) => { |                 Ok(res) => { | ||||||
|                     debug!("Fetching state events at event."); |                     debug!("Fetching state events at event."); | ||||||
|                     let state_vec = match fetch_and_handle_events( |                     let state_vec = fetch_and_handle_events( | ||||||
|                         &db, |                         &db, | ||||||
|                         origin, |                         origin, | ||||||
|                         &res.pdu_ids, |                         &res.pdu_ids, | ||||||
|                         &room_id, |                         &room_id, | ||||||
|                         pub_key_map, |                         pub_key_map, | ||||||
|  |                         false, | ||||||
|                     ) |                     ) | ||||||
|                     .await |                     .await; | ||||||
|                     { |  | ||||||
|                         Ok(state) => state, |  | ||||||
|                         Err(_) => return Err("Failed to fetch state events.".to_owned()), |  | ||||||
|                     }; |  | ||||||
| 
 | 
 | ||||||
|                     let mut state = HashMap::new(); |                     let mut state = HashMap::new(); | ||||||
|                     for pdu in state_vec { |                     for pdu in state_vec { | ||||||
|  | @ -1118,18 +1140,15 @@ pub fn handle_incoming_pdu<'a>( | ||||||
|                     } |                     } | ||||||
| 
 | 
 | ||||||
|                     debug!("Fetching auth chain events at event."); |                     debug!("Fetching auth chain events at event."); | ||||||
|                     match fetch_and_handle_events( |                     fetch_and_handle_events( | ||||||
|                         &db, |                         &db, | ||||||
|                         origin, |                         origin, | ||||||
|                         &res.auth_chain_ids, |                         &res.auth_chain_ids, | ||||||
|                         &room_id, |                         &room_id, | ||||||
|                         pub_key_map, |                         pub_key_map, | ||||||
|  |                         false, | ||||||
|                     ) |                     ) | ||||||
|                     .await |                     .await; | ||||||
|                     { |  | ||||||
|                         Ok(state) => state, |  | ||||||
|                         Err(_) => return Err("Failed to fetch auth chain.".to_owned()), |  | ||||||
|                     }; |  | ||||||
| 
 | 
 | ||||||
|                     state_at_incoming_event = Some(state); |                     state_at_incoming_event = Some(state); | ||||||
|                 } |                 } | ||||||
|  | @ -1381,7 +1400,8 @@ pub(crate) fn fetch_and_handle_events<'a>( | ||||||
|     events: &'a [EventId], |     events: &'a [EventId], | ||||||
|     room_id: &'a RoomId, |     room_id: &'a RoomId, | ||||||
|     pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, |     pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, | ||||||
| ) -> AsyncRecursiveResult<'a, Vec<Arc<PduEvent>>, Error> { |     are_timeline_events: bool, | ||||||
|  | ) -> AsyncRecursiveType<'a, Vec<Arc<PduEvent>>> { | ||||||
|     Box::pin(async move { |     Box::pin(async move { | ||||||
|         let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { |         let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { | ||||||
|             Entry::Vacant(e) => { |             Entry::Vacant(e) => { | ||||||
|  | @ -1408,7 +1428,12 @@ pub(crate) fn fetch_and_handle_events<'a>( | ||||||
|             // a. Look in the main timeline (pduid_pdu tree)
 |             // a. Look in the main timeline (pduid_pdu tree)
 | ||||||
|             // b. Look at outlier pdu tree
 |             // b. Look at outlier pdu tree
 | ||||||
|             // (get_pdu checks both)
 |             // (get_pdu checks both)
 | ||||||
|             let pdu = match db.rooms.get_pdu(&id) { |             let local_pdu = if are_timeline_events { | ||||||
|  |                 db.rooms.get_non_outlier_pdu(&id).map(|o| o.map(Arc::new)) | ||||||
|  |             } else { | ||||||
|  |                 db.rooms.get_pdu(&id) | ||||||
|  |             }; | ||||||
|  |             let pdu = match local_pdu { | ||||||
|                 Ok(Some(pdu)) => { |                 Ok(Some(pdu)) => { | ||||||
|                     trace!("Found {} in db", id); |                     trace!("Found {} in db", id); | ||||||
|                     pdu |                     pdu | ||||||
|  | @ -1439,7 +1464,7 @@ pub(crate) fn fetch_and_handle_events<'a>( | ||||||
|                                 &event_id, |                                 &event_id, | ||||||
|                                 &room_id, |                                 &room_id, | ||||||
|                                 value.clone(), |                                 value.clone(), | ||||||
|                                 false, |                                 are_timeline_events, | ||||||
|                                 db, |                                 db, | ||||||
|                                 pub_key_map, |                                 pub_key_map, | ||||||
|                             ) |                             ) | ||||||
|  | @ -1482,7 +1507,7 @@ pub(crate) fn fetch_and_handle_events<'a>( | ||||||
|             }; |             }; | ||||||
|             pdus.push(pdu); |             pdus.push(pdu); | ||||||
|         } |         } | ||||||
|         Ok(pdus) |         pdus | ||||||
|     }) |     }) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -2193,7 +2218,8 @@ pub async fn create_join_event_route( | ||||||
|         &pub_key_map, |         &pub_key_map, | ||||||
|     ) |     ) | ||||||
|     .await |     .await | ||||||
|     .map_err(|_| { |     .map_err(|e| { | ||||||
|  |         warn!("Error while handling incoming send join PDU: {}", e); | ||||||
|         Error::BadRequest( |         Error::BadRequest( | ||||||
|             ErrorKind::InvalidParam, |             ErrorKind::InvalidParam, | ||||||
|             "Error while handling incoming PDU.", |             "Error while handling incoming PDU.", | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue