improvement: better prev event fetching, perf improvements
This commit is contained in:
		
							parent
							
								
									75ba8bb565
								
							
						
					
					
						commit
						bf7e019a68
					
				
					 4 changed files with 202 additions and 69 deletions
				
			
		|  | @ -733,7 +733,7 @@ pub async fn deactivate_route( | |||
| pub async fn third_party_route( | ||||
|     body: Ruma<get_contacts::Request>, | ||||
| ) -> ConduitResult<get_contacts::Response> { | ||||
|     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
|     let _sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
| 
 | ||||
|     Ok(get_contacts::Response::new(Vec::new()).into()) | ||||
| } | ||||
|  |  | |||
|  | @ -278,6 +278,8 @@ impl Database { | |||
|                 pdu_cache: Mutex::new(LruCache::new(100_000)), | ||||
|                 auth_chain_cache: Mutex::new(LruCache::new(100_000)), | ||||
|                 shorteventid_cache: Mutex::new(LruCache::new(1_000_000)), | ||||
|                 eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), | ||||
|                 statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), | ||||
|                 stateinfo_cache: Mutex::new(LruCache::new(1000)), | ||||
|             }, | ||||
|             account_data: account_data::AccountData { | ||||
|  |  | |||
|  | @ -92,6 +92,8 @@ pub struct Rooms { | |||
|     pub(super) pdu_cache: Mutex<LruCache<EventId, Arc<PduEvent>>>, | ||||
|     pub(super) auth_chain_cache: Mutex<LruCache<u64, HashSet<u64>>>, | ||||
|     pub(super) shorteventid_cache: Mutex<LruCache<u64, EventId>>, | ||||
|     pub(super) eventidshort_cache: Mutex<LruCache<EventId, u64>>, | ||||
|     pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>, | ||||
|     pub(super) stateinfo_cache: Mutex< | ||||
|         LruCache< | ||||
|             u64, | ||||
|  | @ -665,7 +667,11 @@ impl Rooms { | |||
|         event_id: &EventId, | ||||
|         globals: &super::globals::Globals, | ||||
|     ) -> Result<u64> { | ||||
|         Ok(match self.eventid_shorteventid.get(event_id.as_bytes())? { | ||||
|         if let Some(short) = self.eventidshort_cache.lock().unwrap().get_mut(&event_id) { | ||||
|             return Ok(*short); | ||||
|         } | ||||
| 
 | ||||
|         let short = match self.eventid_shorteventid.get(event_id.as_bytes())? { | ||||
|             Some(shorteventid) => utils::u64_from_bytes(&shorteventid) | ||||
|                 .map_err(|_| Error::bad_database("Invalid shorteventid in db."))?, | ||||
|             None => { | ||||
|  | @ -676,7 +682,14 @@ impl Rooms { | |||
|                     .insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; | ||||
|                 shorteventid | ||||
|             } | ||||
|         }) | ||||
|         }; | ||||
| 
 | ||||
|         self.eventidshort_cache | ||||
|             .lock() | ||||
|             .unwrap() | ||||
|             .insert(event_id.clone(), short); | ||||
| 
 | ||||
|         Ok(short) | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_shortroomid(&self, room_id: &RoomId) -> Result<Option<u64>> { | ||||
|  | @ -694,17 +707,36 @@ impl Rooms { | |||
|         event_type: &EventType, | ||||
|         state_key: &str, | ||||
|     ) -> Result<Option<u64>> { | ||||
|         if let Some(short) = self | ||||
|             .statekeyshort_cache | ||||
|             .lock() | ||||
|             .unwrap() | ||||
|             .get_mut(&(event_type.clone(), state_key.to_owned())) | ||||
|         { | ||||
|             return Ok(Some(*short)); | ||||
|         } | ||||
| 
 | ||||
|         let mut statekey = event_type.as_ref().as_bytes().to_vec(); | ||||
|         statekey.push(0xff); | ||||
|         statekey.extend_from_slice(&state_key.as_bytes()); | ||||
| 
 | ||||
|         self.statekey_shortstatekey | ||||
|         let short = self | ||||
|             .statekey_shortstatekey | ||||
|             .get(&statekey)? | ||||
|             .map(|shortstatekey| { | ||||
|                 utils::u64_from_bytes(&shortstatekey) | ||||
|                     .map_err(|_| Error::bad_database("Invalid shortstatekey in db.")) | ||||
|             }) | ||||
|             .transpose() | ||||
|             .transpose()?; | ||||
| 
 | ||||
|         if let Some(s) = short { | ||||
|             self.statekeyshort_cache | ||||
|                 .lock() | ||||
|                 .unwrap() | ||||
|                 .insert((event_type.clone(), state_key.to_owned()), s); | ||||
|         } | ||||
| 
 | ||||
|         Ok(short) | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_or_create_shortroomid( | ||||
|  | @ -730,11 +762,20 @@ impl Rooms { | |||
|         state_key: &str, | ||||
|         globals: &super::globals::Globals, | ||||
|     ) -> Result<u64> { | ||||
|         if let Some(short) = self | ||||
|             .statekeyshort_cache | ||||
|             .lock() | ||||
|             .unwrap() | ||||
|             .get_mut(&(event_type.clone(), state_key.to_owned())) | ||||
|         { | ||||
|             return Ok(*short); | ||||
|         } | ||||
| 
 | ||||
|         let mut statekey = event_type.as_ref().as_bytes().to_vec(); | ||||
|         statekey.push(0xff); | ||||
|         statekey.extend_from_slice(&state_key.as_bytes()); | ||||
| 
 | ||||
|         Ok(match self.statekey_shortstatekey.get(&statekey)? { | ||||
|         let short = match self.statekey_shortstatekey.get(&statekey)? { | ||||
|             Some(shortstatekey) => utils::u64_from_bytes(&shortstatekey) | ||||
|                 .map_err(|_| Error::bad_database("Invalid shortstatekey in db."))?, | ||||
|             None => { | ||||
|  | @ -743,7 +784,14 @@ impl Rooms { | |||
|                     .insert(&statekey, &shortstatekey.to_be_bytes())?; | ||||
|                 shortstatekey | ||||
|             } | ||||
|         }) | ||||
|         }; | ||||
| 
 | ||||
|         self.statekeyshort_cache | ||||
|             .lock() | ||||
|             .unwrap() | ||||
|             .insert((event_type.clone(), state_key.to_owned()), short); | ||||
| 
 | ||||
|         Ok(short) | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_eventid_from_short(&self, shorteventid: u64) -> Result<EventId> { | ||||
|  | @ -2173,8 +2221,10 @@ impl Rooms { | |||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 if update_joined_count { | ||||
|                     self.roomserverids.insert(&roomserver_id, &[])?; | ||||
|                     self.serverroomids.insert(&serverroom_id, &[])?; | ||||
|                 } | ||||
|                 self.userroomid_joined.insert(&userroom_id, &[])?; | ||||
|                 self.roomuserid_joined.insert(&roomuser_id, &[])?; | ||||
|                 self.userroomid_invitestate.remove(&userroom_id)?; | ||||
|  | @ -2199,8 +2249,10 @@ impl Rooms { | |||
|                     return Ok(()); | ||||
|                 } | ||||
| 
 | ||||
|                 if update_joined_count { | ||||
|                     self.roomserverids.insert(&roomserver_id, &[])?; | ||||
|                     self.serverroomids.insert(&serverroom_id, &[])?; | ||||
|                 } | ||||
|                 self.userroomid_invitestate.insert( | ||||
|                     &userroom_id, | ||||
|                     &serde_json::to_vec(&last_state.unwrap_or_default()) | ||||
|  | @ -2214,6 +2266,7 @@ impl Rooms { | |||
|                 self.roomuserid_leftcount.remove(&roomuser_id)?; | ||||
|             } | ||||
|             member::MembershipState::Leave | member::MembershipState::Ban => { | ||||
|                 if update_joined_count { | ||||
|                     if self | ||||
|                         .room_members(room_id) | ||||
|                         .chain(self.room_members_invited(room_id)) | ||||
|  | @ -2223,6 +2276,7 @@ impl Rooms { | |||
|                         self.roomserverids.remove(&roomserver_id)?; | ||||
|                         self.serverroomids.remove(&serverroom_id)?; | ||||
|                     } | ||||
|                 } | ||||
|                 self.userroomid_leftstate.insert( | ||||
|                     &userroom_id, | ||||
|                     &serde_json::to_vec(&Vec::<Raw<AnySyncStateEvent>>::new()).unwrap(), | ||||
|  | @ -2245,10 +2299,52 @@ impl Rooms { | |||
|     } | ||||
| 
 | ||||
|     pub fn update_joined_count(&self, room_id: &RoomId) -> Result<()> { | ||||
|         self.roomid_joinedcount.insert( | ||||
|             room_id.as_bytes(), | ||||
|             &(self.room_members(&room_id).count() as u64).to_be_bytes(), | ||||
|         ) | ||||
|         let mut joinedcount = 0_u64; | ||||
|         let mut joined_servers = HashSet::new(); | ||||
| 
 | ||||
|         for joined in self.room_members(&room_id).filter_map(|r| r.ok()) { | ||||
|             joined_servers.insert(joined.server_name().to_owned()); | ||||
|             joinedcount += 1; | ||||
|         } | ||||
| 
 | ||||
|         for invited in self.room_members_invited(&room_id).filter_map(|r| r.ok()) { | ||||
|             joined_servers.insert(invited.server_name().to_owned()); | ||||
|         } | ||||
| 
 | ||||
|         self.roomid_joinedcount | ||||
|             .insert(room_id.as_bytes(), &joinedcount.to_be_bytes())?; | ||||
| 
 | ||||
|         for old_joined_server in self.room_servers(room_id).filter_map(|r| r.ok()) { | ||||
|             if !joined_servers.remove(&old_joined_server) { | ||||
|                 // Server not in room anymore
 | ||||
|                 let mut roomserver_id = room_id.as_bytes().to_vec(); | ||||
|                 roomserver_id.push(0xff); | ||||
|                 roomserver_id.extend_from_slice(old_joined_server.as_bytes()); | ||||
| 
 | ||||
|                 let mut serverroom_id = old_joined_server.as_bytes().to_vec(); | ||||
|                 serverroom_id.push(0xff); | ||||
|                 serverroom_id.extend_from_slice(room_id.as_bytes()); | ||||
| 
 | ||||
|                 self.roomserverids.remove(&roomserver_id)?; | ||||
|                 self.serverroomids.remove(&serverroom_id)?; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // Now only new servers are in joined_servers anymore
 | ||||
|         for server in joined_servers { | ||||
|             let mut roomserver_id = room_id.as_bytes().to_vec(); | ||||
|             roomserver_id.push(0xff); | ||||
|             roomserver_id.extend_from_slice(server.as_bytes()); | ||||
| 
 | ||||
|             let mut serverroom_id = server.as_bytes().to_vec(); | ||||
|             serverroom_id.push(0xff); | ||||
|             serverroom_id.extend_from_slice(room_id.as_bytes()); | ||||
| 
 | ||||
|             self.roomserverids.insert(&roomserver_id, &[])?; | ||||
|             self.serverroomids.insert(&serverroom_id, &[])?; | ||||
|         } | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn leave_room( | ||||
|  |  | |||
|  | @ -272,14 +272,20 @@ where | |||
|             if status == 200 { | ||||
|                 let response = T::IncomingResponse::try_from_http_response(http_response); | ||||
|                 response.map_err(|e| { | ||||
|                     warn!("Invalid 200 response from {}: {}", &destination, e); | ||||
|                     warn!( | ||||
|                         "Invalid 200 response from {} on: {} {}", | ||||
|                         &destination, url, e | ||||
|                     ); | ||||
|                     Error::BadServerResponse("Server returned bad 200 response.") | ||||
|                 }) | ||||
|             } else { | ||||
|                 Err(Error::FederationError( | ||||
|                     destination.to_owned(), | ||||
|                     RumaError::try_from_http_response(http_response).map_err(|e| { | ||||
|                         warn!("Server returned bad error response: {}", e); | ||||
|                         warn!( | ||||
|                             "Invalid {} response from {} on: {} {}", | ||||
|                             status, &destination, url, e | ||||
|                         ); | ||||
|                         Error::BadServerResponse("Server returned bad error response.") | ||||
|                     })?, | ||||
|                 )) | ||||
|  | @ -884,15 +890,10 @@ pub async fn handle_incoming_pdu<'a>( | |||
|     } | ||||
| 
 | ||||
|     // 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 | ||||
|     let mut visited = HashSet::new(); | ||||
|     let mut graph = HashMap::new(); | ||||
|     let mut eventid_info = HashMap::new(); | ||||
|     let mut todo_outlier_stack = incoming_pdu.prev_events.clone(); | ||||
|     let mut todo_timeline_stack = Vec::new(); | ||||
|     while let Some(prev_event_id) = todo_outlier_stack.pop() { | ||||
|         if visited.contains(&prev_event_id) { | ||||
|             continue; | ||||
|         } | ||||
|         visited.insert(prev_event_id.clone()); | ||||
| 
 | ||||
|         if let Some((pdu, json_opt)) = fetch_and_handle_outliers( | ||||
|             db, | ||||
|             origin, | ||||
|  | @ -914,17 +915,50 @@ pub async fn handle_incoming_pdu<'a>( | |||
|                         .expect("Room exists") | ||||
|                         .origin_server_ts | ||||
|                 { | ||||
|                     todo_outlier_stack.extend(pdu.prev_events.iter().cloned()); | ||||
|                     todo_timeline_stack.push((pdu, json)); | ||||
|                     for prev_prev in &pdu.prev_events { | ||||
|                         if !graph.contains_key(prev_prev) { | ||||
|                             todo_outlier_stack.push(dbg!(prev_prev.clone())); | ||||
|                         } | ||||
|                     } | ||||
| 
 | ||||
|                     graph.insert( | ||||
|                         prev_event_id.clone(), | ||||
|                         pdu.prev_events.iter().cloned().collect(), | ||||
|                     ); | ||||
|                     eventid_info.insert(prev_event_id.clone(), (pdu, json)); | ||||
|                 } else { | ||||
|                     graph.insert(prev_event_id.clone(), HashSet::new()); | ||||
|                     eventid_info.insert(prev_event_id.clone(), (pdu, json)); | ||||
|                 } | ||||
|             } else { | ||||
|                 graph.insert(prev_event_id.clone(), HashSet::new()); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     while let Some(prev) = todo_timeline_stack.pop() { | ||||
|     let sorted = | ||||
|         state_res::StateResolution::lexicographical_topological_sort(dbg!(&graph), |event_id| { | ||||
|             // This return value is the key used for sorting events,
 | ||||
|             // events are then sorted by power level, time,
 | ||||
|             // and lexically by event_id.
 | ||||
|             println!("{}", event_id); | ||||
|             Ok(( | ||||
|                 0, | ||||
|                 MilliSecondsSinceUnixEpoch( | ||||
|                     eventid_info | ||||
|                         .get(event_id) | ||||
|                         .map_or_else(|| uint!(0), |info| info.0.origin_server_ts.clone()), | ||||
|                 ), | ||||
|                 ruma::event_id!("$notimportant"), | ||||
|             )) | ||||
|         }) | ||||
|         .map_err(|_| "Error sorting prev events".to_owned())?; | ||||
| 
 | ||||
|     for prev_id in dbg!(sorted) { | ||||
|         if let Some((pdu, json)) = eventid_info.remove(&prev_id) { | ||||
|             upgrade_outlier_to_timeline_pdu( | ||||
|             prev.0, | ||||
|             prev.1, | ||||
|                 pdu, | ||||
|                 json, | ||||
|                 &create_event, | ||||
|                 origin, | ||||
|                 db, | ||||
|  | @ -933,6 +967,7 @@ pub async fn handle_incoming_pdu<'a>( | |||
|             ) | ||||
|             .await?; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     upgrade_outlier_to_timeline_pdu( | ||||
|         incoming_pdu, | ||||
|  | @ -1872,8 +1907,7 @@ fn get_auth_chain( | |||
|             full_auth_chain.extend(cached.iter().cloned()); | ||||
|         } else { | ||||
|             drop(cache); | ||||
|             let mut auth_chain = HashSet::new(); | ||||
|             get_auth_chain_recursive(&event_id, &mut auth_chain, db)?; | ||||
|             let auth_chain = get_auth_chain_inner(&event_id, db)?; | ||||
|             cache = db.rooms.auth_chain_cache(); | ||||
|             cache.insert(sevent_id, auth_chain.clone()); | ||||
|             full_auth_chain.extend(auth_chain); | ||||
|  | @ -1887,33 +1921,34 @@ fn get_auth_chain( | |||
|         .filter_map(move |sid| db.rooms.get_eventid_from_short(sid).ok())) | ||||
| } | ||||
| 
 | ||||
| fn get_auth_chain_recursive( | ||||
|     event_id: &EventId, | ||||
|     found: &mut HashSet<u64>, | ||||
|     db: &Database, | ||||
| ) -> Result<()> { | ||||
|     let r = db.rooms.get_pdu(&event_id); | ||||
|     match r { | ||||
| fn get_auth_chain_inner(event_id: &EventId, db: &Database) -> Result<HashSet<u64>> { | ||||
|     let mut todo = vec![event_id.clone()]; | ||||
|     let mut found = HashSet::new(); | ||||
| 
 | ||||
|     while let Some(event_id) = todo.pop() { | ||||
|         match db.rooms.get_pdu(&event_id) { | ||||
|             Ok(Some(pdu)) => { | ||||
|                 for auth_event in &pdu.auth_events { | ||||
|                     let sauthevent = db | ||||
|                         .rooms | ||||
|                         .get_or_create_shorteventid(auth_event, &db.globals)?; | ||||
| 
 | ||||
|                     if !found.contains(&sauthevent) { | ||||
|                         found.insert(sauthevent); | ||||
|                     get_auth_chain_recursive(&auth_event, found, db)?; | ||||
|                         todo.push(auth_event.clone()); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             Ok(None) => { | ||||
|             warn!("Could not find pdu mentioned in auth events."); | ||||
|                 warn!("Could not find pdu mentioned in auth events: {}", event_id); | ||||
|             } | ||||
|             Err(e) => { | ||||
|             warn!("Could not load event in auth chain: {}", e); | ||||
|                 warn!("Could not load event in auth chain: {} {}", event_id, e); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
|     Ok(found) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue