Merge branch 'speed' into 'master'
fix: make appservices more efficient See merge request famedly/conduit!172
This commit is contained in:
		
						commit
						9033cc86aa
					
				
					 6 changed files with 171 additions and 91 deletions
				
			
		|  | @ -973,6 +973,14 @@ pub async fn invite_helper<'a>( | |||
|             (room_version_id, pdu_json, invite_room_state) | ||||
|         }; | ||||
| 
 | ||||
|         // Generate event id
 | ||||
|         let expected_event_id = EventId::try_from(&*format!( | ||||
|             "${}", | ||||
|             ruma::signatures::reference_hash(&pdu_json, &room_version_id) | ||||
|                 .expect("ruma can calculate reference hashes") | ||||
|         )) | ||||
|         .expect("ruma's reference hashes are valid event ids"); | ||||
| 
 | ||||
|         let response = db | ||||
|             .sending | ||||
|             .send_federation_request( | ||||
|  | @ -980,9 +988,9 @@ pub async fn invite_helper<'a>( | |||
|                 user_id.server_name(), | ||||
|                 create_invite::v2::Request { | ||||
|                     room_id: room_id.clone(), | ||||
|                     event_id: ruma::event_id!("$receivingservershouldsetthis"), | ||||
|                     event_id: expected_event_id.clone(), | ||||
|                     room_version: room_version_id, | ||||
|                     event: PduEvent::convert_to_outgoing_federation_event(pdu_json), | ||||
|                     event: PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()), | ||||
|                     invite_room_state, | ||||
|                 }, | ||||
|             ) | ||||
|  | @ -1002,6 +1010,10 @@ pub async fn invite_helper<'a>( | |||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         if expected_event_id != event_id { | ||||
|             warn!("Server {} changed invite event, that's not allowed in the spec: ours: {:?}, theirs: {:?}", user_id.server_name(), pdu_json, value); | ||||
|         } | ||||
| 
 | ||||
|         let origin = serde_json::from_value::<Box<ServerName>>( | ||||
|             serde_json::to_value(value.get("origin").ok_or(Error::BadRequest( | ||||
|                 ErrorKind::InvalidParam, | ||||
|  |  | |||
|  | @ -246,12 +246,6 @@ async fn sync_helper( | |||
|             .current_shortstatehash(&room_id)? | ||||
|             .expect("All rooms have state"); | ||||
| 
 | ||||
|         let pdus_after_since = db | ||||
|             .rooms | ||||
|             .pdus_after(&sender_user, &room_id, since)? | ||||
|             .next() | ||||
|             .is_some(); | ||||
| 
 | ||||
|         let since_shortstatehash = db.rooms.get_token_shortstatehash(&room_id, since)?; | ||||
| 
 | ||||
|         // Calculates joined_member_count, invited_member_count and heroes
 | ||||
|  | @ -341,7 +335,7 @@ async fn sync_helper( | |||
|                 true, | ||||
|                 state_events, | ||||
|             ) | ||||
|         } else if !pdus_after_since && since_shortstatehash == Some(current_shortstatehash) { | ||||
|         } else if timeline_pdus.len() == 0 && since_shortstatehash == Some(current_shortstatehash) { | ||||
|             // No state changes
 | ||||
|             (Vec::new(), None, None, false, Vec::new()) | ||||
|         } else { | ||||
|  | @ -401,10 +395,7 @@ async fn sync_helper( | |||
| 
 | ||||
|             let send_member_count = state_events | ||||
|                 .iter() | ||||
|                 .any(|event| event.kind == EventType::RoomMember) | ||||
|                 || timeline_pdus.iter().any(|(_, event)| { | ||||
|                     event.state_key.is_some() && event.kind == EventType::RoomMember | ||||
|                 }); | ||||
|                 .any(|event| event.kind == EventType::RoomMember); | ||||
| 
 | ||||
|             if encrypted_room { | ||||
|                 for state_event in &state_events { | ||||
|  |  | |||
|  | @ -287,8 +287,9 @@ impl Database { | |||
|                 eventidshort_cache: Mutex::new(LruCache::new(1_000_000)), | ||||
|                 shortstatekey_cache: Mutex::new(LruCache::new(1_000_000)), | ||||
|                 statekeyshort_cache: Mutex::new(LruCache::new(1_000_000)), | ||||
|                 stateinfo_cache: Mutex::new(LruCache::new(1000)), | ||||
|                 our_real_users_cache: RwLock::new(HashMap::new()), | ||||
|                 appservice_in_room_cache: RwLock::new(HashMap::new()), | ||||
|                 stateinfo_cache: Mutex::new(LruCache::new(1000)), | ||||
|             }, | ||||
|             account_data: account_data::AccountData { | ||||
|                 roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, | ||||
|  |  | |||
|  | @ -148,6 +148,7 @@ type TupleOfBytes = (Vec<u8>, Vec<u8>); | |||
| impl SqliteTable { | ||||
|     #[tracing::instrument(skip(self, guard, key))] | ||||
|     fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> { | ||||
|         //dbg!(&self.name);
 | ||||
|         Ok(guard | ||||
|             .prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())? | ||||
|             .query_row([key], |row| row.get(0)) | ||||
|  | @ -156,6 +157,7 @@ impl SqliteTable { | |||
| 
 | ||||
|     #[tracing::instrument(skip(self, guard, key, value))] | ||||
|     fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> { | ||||
|         //dbg!(&self.name);
 | ||||
|         guard.execute( | ||||
|             format!( | ||||
|                 "INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", | ||||
|  | @ -182,11 +184,16 @@ impl SqliteTable { | |||
| 
 | ||||
|         let statement_ref = NonAliasingBox(statement); | ||||
| 
 | ||||
|         //let name = self.name.clone();
 | ||||
| 
 | ||||
|         let iterator = Box::new( | ||||
|             statement | ||||
|                 .query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) | ||||
|                 .unwrap() | ||||
|                 .map(|r| r.unwrap()), | ||||
|                 .map(move |r| { | ||||
|                     //dbg!(&name);
 | ||||
|                     r.unwrap() | ||||
|                 }), | ||||
|         ); | ||||
| 
 | ||||
|         Box::new(PreparedStatementIterator { | ||||
|  | @ -294,6 +301,8 @@ impl Tree for SqliteTable { | |||
|         let guard = self.engine.read_lock_iterator(); | ||||
|         let from = from.to_vec(); // TODO change interface?
 | ||||
| 
 | ||||
|         //let name = self.name.clone();
 | ||||
| 
 | ||||
|         if backwards { | ||||
|             let statement = Box::leak(Box::new( | ||||
|                 guard | ||||
|  | @ -310,7 +319,10 @@ impl Tree for SqliteTable { | |||
|                 statement | ||||
|                     .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) | ||||
|                     .unwrap() | ||||
|                     .map(|r| r.unwrap()), | ||||
|                     .map(move |r| { | ||||
|                         //dbg!(&name);
 | ||||
|                         r.unwrap() | ||||
|                     }), | ||||
|             ); | ||||
|             Box::new(PreparedStatementIterator { | ||||
|                 iterator, | ||||
|  | @ -332,7 +344,10 @@ impl Tree for SqliteTable { | |||
|                 statement | ||||
|                     .query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) | ||||
|                     .unwrap() | ||||
|                     .map(|r| r.unwrap()), | ||||
|                     .map(move |r| { | ||||
|                         //dbg!(&name);
 | ||||
|                         r.unwrap() | ||||
|                     }), | ||||
|             ); | ||||
| 
 | ||||
|             Box::new(PreparedStatementIterator { | ||||
|  |  | |||
|  | @ -103,6 +103,7 @@ pub struct Rooms { | |||
|     pub(super) statekeyshort_cache: Mutex<LruCache<(EventType, String), u64>>, | ||||
|     pub(super) shortstatekey_cache: Mutex<LruCache<u64, (EventType, String)>>, | ||||
|     pub(super) our_real_users_cache: RwLock<HashMap<RoomId, Arc<HashSet<UserId>>>>, | ||||
|     pub(super) appservice_in_room_cache: RwLock<HashMap<RoomId, HashMap<String, bool>>>, | ||||
|     pub(super) stateinfo_cache: Mutex< | ||||
|         LruCache< | ||||
|             u64, | ||||
|  | @ -2110,6 +2111,11 @@ impl Rooms { | |||
|         } | ||||
| 
 | ||||
|         for appservice in db.appservice.all()? { | ||||
|             if self.appservice_in_room(room_id, &appservice, db)? { | ||||
|                 db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; | ||||
|                 continue; | ||||
|             } | ||||
| 
 | ||||
|             if let Some(namespaces) = appservice.1.get("namespaces") { | ||||
|                 let users = namespaces | ||||
|                     .get("users") | ||||
|  | @ -2133,17 +2139,6 @@ impl Rooms { | |||
|                     .get("rooms") | ||||
|                     .and_then(|rooms| rooms.as_sequence()); | ||||
| 
 | ||||
|                 let bridge_user_id = appservice | ||||
|                     .1 | ||||
|                     .get("sender_localpart") | ||||
|                     .and_then(|string| string.as_str()) | ||||
|                     .and_then(|string| { | ||||
|                         UserId::parse_with_server_name(string, db.globals.server_name()).ok() | ||||
|                     }); | ||||
| 
 | ||||
|                 let user_is_joined = | ||||
|                     |bridge_user_id| self.is_joined(&bridge_user_id, room_id).unwrap_or(false); | ||||
| 
 | ||||
|                 let matching_users = |users: &Regex| { | ||||
|                     users.is_match(pdu.sender.as_str()) | ||||
|                         || pdu.kind == EventType::RoomMember | ||||
|  | @ -2151,9 +2146,6 @@ impl Rooms { | |||
|                                 .state_key | ||||
|                                 .as_ref() | ||||
|                                 .map_or(false, |state_key| users.is_match(&state_key)) | ||||
|                         || self.room_members(&room_id).any(|userid| { | ||||
|                             userid.map_or(false, |userid| users.is_match(userid.as_str())) | ||||
|                         }) | ||||
|                 }; | ||||
|                 let matching_aliases = |aliases: &Regex| { | ||||
|                     self.room_aliases(&room_id) | ||||
|  | @ -2161,8 +2153,7 @@ impl Rooms { | |||
|                         .any(|room_alias| aliases.is_match(room_alias.as_str())) | ||||
|                 }; | ||||
| 
 | ||||
|                 if bridge_user_id.map_or(false, user_is_joined) | ||||
|                     || aliases.iter().any(matching_aliases) | ||||
|                 if aliases.iter().any(matching_aliases) | ||||
|                     || rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into())) | ||||
|                     || users.iter().any(matching_users) | ||||
|                 { | ||||
|  | @ -2579,6 +2570,11 @@ impl Rooms { | |||
|             self.serverroomids.insert(&serverroom_id, &[])?; | ||||
|         } | ||||
| 
 | ||||
|         self.appservice_in_room_cache | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .remove(room_id); | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|  | @ -2608,6 +2604,65 @@ impl Rooms { | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip(self, room_id, appservice, db))] | ||||
|     pub fn appservice_in_room( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|         appservice: &(String, serde_yaml::Value), | ||||
|         db: &Database, | ||||
|     ) -> Result<bool> { | ||||
|         let maybe = self | ||||
|             .appservice_in_room_cache | ||||
|             .read() | ||||
|             .unwrap() | ||||
|             .get(room_id) | ||||
|             .and_then(|map| map.get(&appservice.0)) | ||||
|             .copied(); | ||||
| 
 | ||||
|         if let Some(b) = maybe { | ||||
|             Ok(b) | ||||
|         } else { | ||||
|             if let Some(namespaces) = appservice.1.get("namespaces") { | ||||
|                 let users = namespaces | ||||
|                     .get("users") | ||||
|                     .and_then(|users| users.as_sequence()) | ||||
|                     .map_or_else(Vec::new, |users| { | ||||
|                         users | ||||
|                             .iter() | ||||
|                             .filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok()) | ||||
|                             .collect::<Vec<_>>() | ||||
|                     }); | ||||
| 
 | ||||
|                 let bridge_user_id = appservice | ||||
|                     .1 | ||||
|                     .get("sender_localpart") | ||||
|                     .and_then(|string| string.as_str()) | ||||
|                     .and_then(|string| { | ||||
|                         UserId::parse_with_server_name(string, db.globals.server_name()).ok() | ||||
|                     }); | ||||
| 
 | ||||
|                 let in_room = bridge_user_id | ||||
|                     .map_or(false, |id| self.is_joined(&id, room_id).unwrap_or(false)) | ||||
|                     || self.room_members(&room_id).any(|userid| { | ||||
|                         userid.map_or(false, |userid| { | ||||
|                             users.iter().any(|r| r.is_match(userid.as_str())) | ||||
|                         }) | ||||
|                     }); | ||||
| 
 | ||||
|                 self.appservice_in_room_cache | ||||
|                     .write() | ||||
|                     .unwrap() | ||||
|                     .entry(room_id.clone()) | ||||
|                     .or_default() | ||||
|                     .insert(appservice.0.clone(), in_room); | ||||
| 
 | ||||
|                 Ok(in_room) | ||||
|             } else { | ||||
|                 Ok(false) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip(self, db))] | ||||
|     pub async fn leave_room( | ||||
|         &self, | ||||
|  |  | |||
|  | @ -48,7 +48,7 @@ use ruma::{ | |||
|     state_res::{self, RoomVersion, StateMap}, | ||||
|     to_device::DeviceIdOrAllDevices, | ||||
|     uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, | ||||
|     ServerSigningKeyId, UserId, | ||||
|     ServerSigningKeyId, | ||||
| }; | ||||
| use std::{ | ||||
|     collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet}, | ||||
|  | @ -1497,6 +1497,47 @@ async fn upgrade_outlier_to_timeline_pdu( | |||
|         ) | ||||
|         .map_err(|_| "Failed to get_auth_events.".to_owned())?; | ||||
| 
 | ||||
|     let state_ids_compressed = state_at_incoming_event | ||||
|         .iter() | ||||
|         .map(|(shortstatekey, id)| { | ||||
|             db.rooms | ||||
|                 .compress_state_event(*shortstatekey, &id, &db.globals) | ||||
|                 .map_err(|_| "Failed to compress_state_event".to_owned()) | ||||
|         }) | ||||
|         .collect::<StdResult<_, String>>()?; | ||||
| 
 | ||||
|     // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
 | ||||
|     debug!("starting soft fail auth check"); | ||||
| 
 | ||||
|     let soft_fail = !state_res::event_auth::auth_check( | ||||
|         &room_version, | ||||
|         &incoming_pdu, | ||||
|         previous_create, | ||||
|         None, | ||||
|         |k, s| auth_events.get(&(k.clone(), s.to_owned())).map(Arc::clone), | ||||
|     ) | ||||
|     .map_err(|_e| "Auth check failed.".to_owned())?; | ||||
| 
 | ||||
|     if soft_fail { | ||||
|         append_incoming_pdu( | ||||
|             &db, | ||||
|             &incoming_pdu, | ||||
|             val, | ||||
|             extremities, | ||||
|             state_ids_compressed, | ||||
|             soft_fail, | ||||
|             &state_lock, | ||||
|         ) | ||||
|         .map_err(|_| "Failed to add pdu to db.".to_owned())?; | ||||
| 
 | ||||
|         // Soft fail, we keep the event as an outlier but don't add it to the timeline
 | ||||
|         warn!("Event was soft failed: {:?}", incoming_pdu); | ||||
|         db.rooms | ||||
|             .mark_event_soft_failed(&incoming_pdu.event_id) | ||||
|             .map_err(|_| "Failed to set soft failed flag".to_owned())?; | ||||
|         return Err("Event has been soft failed".into()); | ||||
|     } | ||||
| 
 | ||||
|     if incoming_pdu.state_key.is_some() { | ||||
|         let mut extremity_sstatehashes = HashMap::new(); | ||||
| 
 | ||||
|  | @ -1651,31 +1692,10 @@ async fn upgrade_outlier_to_timeline_pdu( | |||
| 
 | ||||
|     extremities.insert(incoming_pdu.event_id.clone()); | ||||
| 
 | ||||
|     // 13. Check if the event passes auth based on the "current state" of the room, if not "soft fail" it
 | ||||
|     debug!("starting soft fail auth check"); | ||||
| 
 | ||||
|     let soft_fail = !state_res::event_auth::auth_check( | ||||
|         &room_version, | ||||
|         &incoming_pdu, | ||||
|         previous_create, | ||||
|         None, | ||||
|         |k, s| auth_events.get(&(k.clone(), s.to_owned())).map(Arc::clone), | ||||
|     ) | ||||
|     .map_err(|_e| "Auth check failed.".to_owned())?; | ||||
| 
 | ||||
|     // Now that the event has passed all auth it is added into the timeline.
 | ||||
|     // We use the `state_at_event` instead of `state_after` so we accurately
 | ||||
|     // represent the state for this event.
 | ||||
| 
 | ||||
|     let state_ids_compressed = state_at_incoming_event | ||||
|         .iter() | ||||
|         .map(|(shortstatekey, id)| { | ||||
|             db.rooms | ||||
|                 .compress_state_event(*shortstatekey, &id, &db.globals) | ||||
|                 .map_err(|_| "Failed to compress_state_event".to_owned()) | ||||
|         }) | ||||
|         .collect::<StdResult<_, String>>()?; | ||||
| 
 | ||||
|     let pdu_id = append_incoming_pdu( | ||||
|         &db, | ||||
|         &incoming_pdu, | ||||
|  | @ -1689,15 +1709,6 @@ async fn upgrade_outlier_to_timeline_pdu( | |||
| 
 | ||||
|     debug!("Appended incoming pdu."); | ||||
| 
 | ||||
|     if soft_fail { | ||||
|         // Soft fail, we keep the event as an outlier but don't add it to the timeline
 | ||||
|         warn!("Event was soft failed: {:?}", incoming_pdu); | ||||
|         db.rooms | ||||
|             .mark_event_soft_failed(&incoming_pdu.event_id) | ||||
|             .map_err(|_| "Failed to set soft failed flag".to_owned())?; | ||||
|         return Err("Event has been soft failed".into()); | ||||
|     } | ||||
| 
 | ||||
|     // Event has passed all auth/stateres checks
 | ||||
|     drop(state_lock); | ||||
|     Ok(pdu_id) | ||||
|  | @ -2017,6 +2028,11 @@ fn append_incoming_pdu( | |||
|     )?; | ||||
| 
 | ||||
|     for appservice in db.appservice.all()? { | ||||
|         if db.rooms.appservice_in_room(&pdu.room_id, &appservice, db)? { | ||||
|             db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; | ||||
|             continue; | ||||
|         } | ||||
| 
 | ||||
|         if let Some(namespaces) = appservice.1.get("namespaces") { | ||||
|             let users = namespaces | ||||
|                 .get("users") | ||||
|  | @ -2029,45 +2045,35 @@ fn append_incoming_pdu( | |||
|                 }); | ||||
|             let aliases = namespaces | ||||
|                 .get("aliases") | ||||
|                 .and_then(|users| users.get("regex")) | ||||
|                 .and_then(|regex| regex.as_str()) | ||||
|                 .and_then(|regex| Regex::new(regex).ok()); | ||||
|                 .and_then(|aliases| aliases.as_sequence()) | ||||
|                 .map_or_else(Vec::new, |aliases| { | ||||
|                     aliases | ||||
|                         .iter() | ||||
|                         .filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok()) | ||||
|                         .collect::<Vec<_>>() | ||||
|                 }); | ||||
|             let rooms = namespaces | ||||
|                 .get("rooms") | ||||
|                 .and_then(|rooms| rooms.as_sequence()); | ||||
| 
 | ||||
|             let room_aliases = db.rooms.room_aliases(&pdu.room_id); | ||||
| 
 | ||||
|             let bridge_user_id = appservice | ||||
|                 .1 | ||||
|                 .get("sender_localpart") | ||||
|                 .and_then(|string| string.as_str()) | ||||
|                 .and_then(|string| { | ||||
|                     UserId::parse_with_server_name(string, db.globals.server_name()).ok() | ||||
|                 }); | ||||
| 
 | ||||
|             #[allow(clippy::blocks_in_if_conditions)] | ||||
|             if bridge_user_id.map_or(false, |bridge_user_id| { | ||||
|                 db.rooms | ||||
|                     .is_joined(&bridge_user_id, &pdu.room_id) | ||||
|                     .unwrap_or(false) | ||||
|             }) || users.iter().any(|users| { | ||||
|             let matching_users = |users: &Regex| { | ||||
|                 users.is_match(pdu.sender.as_str()) | ||||
|                     || pdu.kind == EventType::RoomMember | ||||
|                         && pdu | ||||
|                             .state_key | ||||
|                             .as_ref() | ||||
|                             .map_or(false, |state_key| users.is_match(&state_key)) | ||||
|             }) || aliases.map_or(false, |aliases| { | ||||
|                 room_aliases | ||||
|             }; | ||||
|             let matching_aliases = |aliases: &Regex| { | ||||
|                 db.rooms | ||||
|                     .room_aliases(&pdu.room_id) | ||||
|                     .filter_map(|r| r.ok()) | ||||
|                     .any(|room_alias| aliases.is_match(room_alias.as_str())) | ||||
|             }) || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) | ||||
|                 || db | ||||
|                     .rooms | ||||
|                     .room_members(&pdu.room_id) | ||||
|                     .filter_map(|r| r.ok()) | ||||
|                     .any(|member| users.iter().any(|regex| regex.is_match(member.as_str()))) | ||||
|             }; | ||||
| 
 | ||||
|             if aliases.iter().any(matching_aliases) | ||||
|                 || rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into())) | ||||
|                 || users.iter().any(matching_users) | ||||
|             { | ||||
|                 db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; | ||||
|             } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue