improvement: call /state_ids less often by using state res
This commit is contained in:
		
							parent
							
								
									9033cc86aa
								
							
						
					
					
						commit
						bd4ea14a29
					
				
					 1 changed files with 116 additions and 13 deletions
				
			
		|  | @ -964,6 +964,16 @@ pub async fn handle_incoming_pdu<'a>( | ||||||
|         return Ok(None); |         return Ok(None); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     if incoming_pdu.origin_server_ts | ||||||
|  |         < db.rooms | ||||||
|  |             .first_pdu_in_room(&room_id) | ||||||
|  |             .map_err(|_| "Error loading first room event.".to_owned())? | ||||||
|  |             .expect("Room exists") | ||||||
|  |             .origin_server_ts | ||||||
|  |     { | ||||||
|  |         return Ok(None); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 |     // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 | ||||||
|     let mut graph = HashMap::new(); |     let mut graph = HashMap::new(); | ||||||
|     let mut eventid_info = HashMap::new(); |     let mut eventid_info = HashMap::new(); | ||||||
|  | @ -1280,6 +1290,18 @@ async fn upgrade_outlier_to_timeline_pdu( | ||||||
|         return Err("Event has been soft failed".into()); |         return Err("Event has been soft failed".into()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     let create_event_content = | ||||||
|  |         serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) | ||||||
|  |             .expect("Raw::from_value always works.") | ||||||
|  |             .deserialize() | ||||||
|  |             .map_err(|e| { | ||||||
|  |                 warn!("Invalid create event: {}", e); | ||||||
|  |                 "Invalid create event in db.".to_owned() | ||||||
|  |             })?; | ||||||
|  | 
 | ||||||
|  |     let room_version_id = &create_event_content.room_version; | ||||||
|  |     let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); | ||||||
|  | 
 | ||||||
|     // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
 |     // 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
 | ||||||
|     //     doing all the checks in this list starting at 1. These are not timeline events.
 |     //     doing all the checks in this list starting at 1. These are not timeline events.
 | ||||||
| 
 | 
 | ||||||
|  | @ -1318,7 +1340,100 @@ async fn upgrade_outlier_to_timeline_pdu( | ||||||
| 
 | 
 | ||||||
|             state_at_incoming_event = Some(state); |             state_at_incoming_event = Some(state); | ||||||
|         } |         } | ||||||
|         // TODO: set incoming_auth_events?
 |     } else { | ||||||
|  |         warn!("Calculating state at event using state res"); | ||||||
|  |         let mut extremity_sstatehashes = HashMap::new(); | ||||||
|  | 
 | ||||||
|  |         let mut okay = true; | ||||||
|  |         for prev_eventid in &incoming_pdu.prev_events { | ||||||
|  |             let prev_event = if let Ok(Some(pdu)) = db.rooms.get_pdu(prev_eventid) { | ||||||
|  |                 pdu | ||||||
|  |             } else { | ||||||
|  |                 okay = false; | ||||||
|  |                 break; | ||||||
|  |             }; | ||||||
|  | 
 | ||||||
|  |             let sstatehash = if let Ok(Some(s)) = db.rooms.pdu_shortstatehash(prev_eventid) { | ||||||
|  |                 s | ||||||
|  |             } else { | ||||||
|  |                 okay = false; | ||||||
|  |                 break; | ||||||
|  |             }; | ||||||
|  | 
 | ||||||
|  |             extremity_sstatehashes.insert(sstatehash, prev_event); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let mut fork_states = Vec::new(); | ||||||
|  | 
 | ||||||
|  |         if okay { | ||||||
|  |             for (sstatehash, prev_event) in extremity_sstatehashes { | ||||||
|  |                 let mut leaf_state = db | ||||||
|  |                     .rooms | ||||||
|  |                     .state_full_ids(sstatehash) | ||||||
|  |                     .map_err(|_| "Failed to ask db for room state.".to_owned())?; | ||||||
|  | 
 | ||||||
|  |                 if let Some(state_key) = &prev_event.state_key { | ||||||
|  |                     let shortstatekey = db | ||||||
|  |                         .rooms | ||||||
|  |                         .get_or_create_shortstatekey(&prev_event.kind, state_key, &db.globals) | ||||||
|  |                         .map_err(|_| "Failed to create shortstatekey.".to_owned())?; | ||||||
|  |                     leaf_state.insert(shortstatekey, Arc::new(prev_event.event_id.clone())); | ||||||
|  |                     // Now it's the state after the pdu
 | ||||||
|  |                 } | ||||||
|  | 
 | ||||||
|  |                 fork_states.push(leaf_state); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             let fork_states = &fork_states | ||||||
|  |                 .into_iter() | ||||||
|  |                 .map(|map| { | ||||||
|  |                     map.into_iter() | ||||||
|  |                         .map(|(k, id)| (db.rooms.get_statekey_from_short(k).map(|k| (k, id)))) | ||||||
|  |                         .collect::<Result<StateMap<_>>>() | ||||||
|  |                 }) | ||||||
|  |                 .collect::<Result<Vec<_>>>() | ||||||
|  |                 .map_err(|_| "Failed to get_statekey_from_short.".to_owned())?; | ||||||
|  | 
 | ||||||
|  |             let mut auth_chain_sets = Vec::new(); | ||||||
|  |             for state in fork_states { | ||||||
|  |                 auth_chain_sets.push( | ||||||
|  |                     get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db) | ||||||
|  |                         .map_err(|_| "Failed to load auth chain.".to_owned())? | ||||||
|  |                         .collect(), | ||||||
|  |                 ); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             state_at_incoming_event = match state_res::StateResolution::resolve( | ||||||
|  |                 &room_id, | ||||||
|  |                 room_version_id, | ||||||
|  |                 &fork_states, | ||||||
|  |                 auth_chain_sets, | ||||||
|  |                 |id| { | ||||||
|  |                     let res = db.rooms.get_pdu(id); | ||||||
|  |                     if let Err(e) = &res { | ||||||
|  |                         error!("LOOK AT ME Failed to fetch event: {}", e); | ||||||
|  |                     } | ||||||
|  |                     res.ok().flatten() | ||||||
|  |                 }, | ||||||
|  |             ) { | ||||||
|  |                 Ok(new_state) => Some( | ||||||
|  |                     new_state | ||||||
|  |                         .into_iter() | ||||||
|  |                         .map(|((event_type, state_key), event_id)| { | ||||||
|  |                             let shortstatekey = db | ||||||
|  |                                 .rooms | ||||||
|  |                                 .get_or_create_shortstatekey(&event_type, &state_key, &db.globals) | ||||||
|  |                                 .map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?; | ||||||
|  |                             Ok((shortstatekey, event_id)) | ||||||
|  |                         }) | ||||||
|  |                         .collect::<StdResult<_, String>>()?, | ||||||
|  |                 ), | ||||||
|  |                 Err(e) => { | ||||||
|  |                     warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e); | ||||||
|  |                     None | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     if state_at_incoming_event.is_none() { |     if state_at_incoming_event.is_none() { | ||||||
|  | @ -1402,18 +1517,6 @@ async fn upgrade_outlier_to_timeline_pdu( | ||||||
|         state_at_incoming_event.expect("we always set this to some above"); |         state_at_incoming_event.expect("we always set this to some above"); | ||||||
| 
 | 
 | ||||||
|     // 11. Check the auth of the event passes based on the state of the event
 |     // 11. Check the auth of the event passes based on the state of the event
 | ||||||
|     let create_event_content = |  | ||||||
|         serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) |  | ||||||
|             .expect("Raw::from_value always works.") |  | ||||||
|             .deserialize() |  | ||||||
|             .map_err(|e| { |  | ||||||
|                 warn!("Invalid create event: {}", e); |  | ||||||
|                 "Invalid create event in db.".to_owned() |  | ||||||
|             })?; |  | ||||||
| 
 |  | ||||||
|     let room_version_id = &create_event_content.room_version; |  | ||||||
|     let room_version = RoomVersion::new(room_version_id).expect("room version is supported"); |  | ||||||
| 
 |  | ||||||
|     // If the previous event was the create event special rules apply
 |     // If the previous event was the create event special rules apply
 | ||||||
|     let previous_create = if incoming_pdu.auth_events.len() == 1 |     let previous_create = if incoming_pdu.auth_events.len() == 1 | ||||||
|         && incoming_pdu.prev_events == incoming_pdu.auth_events |         && incoming_pdu.prev_events == incoming_pdu.auth_events | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue