improvement: better appservice compatibility and optimizations
This commit is contained in:
		
							parent
							
								
									fd3fb3a7ed
								
							
						
					
					
						commit
						6924dfc8ea
					
				
					 9 changed files with 270 additions and 207 deletions
				
			
		|  | @ -1,6 +1,6 @@ | |||
| use crate::{utils, Error, Result}; | ||||
| use http::header::{HeaderValue, CONTENT_TYPE}; | ||||
| use log::warn; | ||||
| use log::{info, warn}; | ||||
| use ruma::api::OutgoingRequest; | ||||
| use std::{ | ||||
|     convert::{TryFrom, TryInto}, | ||||
|  |  | |||
|  | @ -234,7 +234,7 @@ pub async fn get_state_events_for_empty_key_route( | |||
|         .1; | ||||
| 
 | ||||
|     Ok(get_state_events_for_empty_key::Response { | ||||
|         content: serde_json::value::to_raw_value(&event) | ||||
|         content: serde_json::value::to_raw_value(&event.content) | ||||
|             .map_err(|_| Error::bad_database("Invalid event content in database"))?, | ||||
|     } | ||||
|     .into()) | ||||
|  |  | |||
|  | @ -95,15 +95,7 @@ pub async fn sync_events_route( | |||
| 
 | ||||
|         // Database queries:
 | ||||
| 
 | ||||
|         let current_state = db.rooms.room_state_full(&room_id)?; | ||||
|         let current_members = current_state | ||||
|             .iter() | ||||
|             .filter(|(key, _)| key.0 == EventType::RoomMember) | ||||
|             .map(|(key, value)| (&key.1, value)) // Only keep state key
 | ||||
|             .collect::<Vec<_>>(); | ||||
|         let encrypted_room = current_state | ||||
|             .get(&(EventType::RoomEncryption, "".to_owned())) | ||||
|             .is_some(); | ||||
|         let current_state_hash = db.rooms.current_state_hash(&room_id)?; | ||||
| 
 | ||||
|         // These type is Option<Option<_>>. The outer Option is None when there is no event between
 | ||||
|         // since and the current room state, meaning there should be no updates.
 | ||||
|  | @ -115,69 +107,85 @@ pub async fn sync_events_route( | |||
|             .as_ref() | ||||
|             .map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?); | ||||
| 
 | ||||
|         let since_state = since_state_hash.as_ref().map(|state_hash| { | ||||
|             state_hash | ||||
|                 .as_ref() | ||||
|                 .and_then(|state_hash| db.rooms.state_full(&room_id, &state_hash).ok()) | ||||
|         }); | ||||
|         let ( | ||||
|             heroes, | ||||
|             joined_member_count, | ||||
|             invited_member_count, | ||||
|             joined_since_last_sync, | ||||
|             state_events, | ||||
|         ) = if since_state_hash != None && Some(¤t_state_hash) != since_state_hash.as_ref() { | ||||
|             let current_state = db.rooms.room_state_full(&room_id)?; | ||||
|             let current_members = current_state | ||||
|                 .iter() | ||||
|                 .filter(|(key, _)| key.0 == EventType::RoomMember) | ||||
|                 .map(|(key, value)| (&key.1, value)) // Only keep state key
 | ||||
|                 .collect::<Vec<_>>(); | ||||
|             let encrypted_room = current_state | ||||
|                 .get(&(EventType::RoomEncryption, "".to_owned())) | ||||
|                 .is_some(); | ||||
|             let since_state = since_state_hash.as_ref().map(|state_hash| { | ||||
|                 state_hash | ||||
|                     .as_ref() | ||||
|                     .and_then(|state_hash| db.rooms.state_full(&room_id, &state_hash).ok()) | ||||
|             }); | ||||
| 
 | ||||
|         let since_encryption = since_state.as_ref().map(|state| { | ||||
|             state | ||||
|                 .as_ref() | ||||
|                 .map(|state| state.get(&(EventType::RoomEncryption, "".to_owned()))) | ||||
|         }); | ||||
| 
 | ||||
|         // Calculations:
 | ||||
|         let new_encrypted_room = | ||||
|             encrypted_room && since_encryption.map_or(false, |encryption| encryption.is_none()); | ||||
| 
 | ||||
|         let send_member_count = since_state.as_ref().map_or(false, |since_state| { | ||||
|             since_state.as_ref().map_or(true, |since_state| { | ||||
|                 current_members.len() | ||||
|                     != since_state | ||||
|                         .iter() | ||||
|                         .filter(|(key, _)| key.0 == EventType::RoomMember) | ||||
|                         .count() | ||||
|             }) | ||||
|         }); | ||||
| 
 | ||||
|         let since_sender_member = since_state.as_ref().map(|since_state| { | ||||
|             since_state.as_ref().and_then(|state| { | ||||
|             let since_encryption = since_state.as_ref().map(|state| { | ||||
|                 state | ||||
|                     .get(&(EventType::RoomMember, sender_user.as_str().to_owned())) | ||||
|                     .and_then(|pdu| { | ||||
|                         serde_json::from_value::< | ||||
|                     .as_ref() | ||||
|                     .map(|state| state.get(&(EventType::RoomEncryption, "".to_owned()))) | ||||
|             }); | ||||
| 
 | ||||
|             // Calculations:
 | ||||
|             let new_encrypted_room = | ||||
|                 encrypted_room && since_encryption.map_or(false, |encryption| encryption.is_none()); | ||||
| 
 | ||||
|             let send_member_count = since_state.as_ref().map_or(false, |since_state| { | ||||
|                 since_state.as_ref().map_or(true, |since_state| { | ||||
|                     current_members.len() | ||||
|                         != since_state | ||||
|                             .iter() | ||||
|                             .filter(|(key, _)| key.0 == EventType::RoomMember) | ||||
|                             .count() | ||||
|                 }) | ||||
|             }); | ||||
| 
 | ||||
|             let since_sender_member = since_state.as_ref().map(|since_state| { | ||||
|                 since_state.as_ref().and_then(|state| { | ||||
|                     state | ||||
|                         .get(&(EventType::RoomMember, sender_user.as_str().to_owned())) | ||||
|                         .and_then(|pdu| { | ||||
|                             serde_json::from_value::< | ||||
|                                 Raw<ruma::events::room::member::MemberEventContent>, | ||||
|                             >(pdu.content.clone()) | ||||
|                             .expect("Raw::from_value always works") | ||||
|                             .deserialize() | ||||
|                             .map_err(|_| Error::bad_database("Invalid PDU in database.")) | ||||
|                             .ok() | ||||
|                     }) | ||||
|             }) | ||||
|         }); | ||||
|                         }) | ||||
|                 }) | ||||
|             }); | ||||
| 
 | ||||
|         if encrypted_room { | ||||
|             for (user_id, current_member) in current_members { | ||||
|                 let current_membership = serde_json::from_value::< | ||||
|                     Raw<ruma::events::room::member::MemberEventContent>, | ||||
|                 >(current_member.content.clone()) | ||||
|                 .expect("Raw::from_value always works") | ||||
|                 .deserialize() | ||||
|                 .map_err(|_| Error::bad_database("Invalid PDU in database."))? | ||||
|                 .membership; | ||||
|             if encrypted_room { | ||||
|                 for (user_id, current_member) in current_members { | ||||
|                     let current_membership = serde_json::from_value::< | ||||
|                         Raw<ruma::events::room::member::MemberEventContent>, | ||||
|                     >(current_member.content.clone()) | ||||
|                     .expect("Raw::from_value always works") | ||||
|                     .deserialize() | ||||
|                     .map_err(|_| Error::bad_database("Invalid PDU in database."))? | ||||
|                     .membership; | ||||
| 
 | ||||
|                 let since_membership = | ||||
|                     since_state | ||||
|                         .as_ref() | ||||
|                         .map_or(MembershipState::Join, |since_state| { | ||||
|                             since_state | ||||
|                                 .as_ref() | ||||
|                                 .and_then(|since_state| { | ||||
|                                     since_state | ||||
|                                         .get(&(EventType::RoomMember, user_id.clone())) | ||||
|                                         .and_then(|since_member| { | ||||
|                                             serde_json::from_value::< | ||||
|                     let since_membership = | ||||
|                         since_state | ||||
|                             .as_ref() | ||||
|                             .map_or(MembershipState::Join, |since_state| { | ||||
|                                 since_state | ||||
|                                     .as_ref() | ||||
|                                     .and_then(|since_state| { | ||||
|                                         since_state | ||||
|                                             .get(&(EventType::RoomMember, user_id.clone())) | ||||
|                                             .and_then(|since_member| { | ||||
|                                                 serde_json::from_value::< | ||||
|                                                 Raw<ruma::events::room::member::MemberEventContent>, | ||||
|                                             >( | ||||
|                                                 since_member.content.clone() | ||||
|  | @ -188,50 +196,158 @@ pub async fn sync_events_route( | |||
|                                                 Error::bad_database("Invalid PDU in database.") | ||||
|                                             }) | ||||
|                                             .ok() | ||||
|                                         }) | ||||
|                                 }) | ||||
|                                 .map_or(MembershipState::Leave, |member| member.membership) | ||||
|                         }); | ||||
|                                             }) | ||||
|                                     }) | ||||
|                                     .map_or(MembershipState::Leave, |member| member.membership) | ||||
|                             }); | ||||
| 
 | ||||
|                 let user_id = UserId::try_from(user_id.clone()) | ||||
|                     .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; | ||||
|                     let user_id = UserId::try_from(user_id.clone()) | ||||
|                         .map_err(|_| Error::bad_database("Invalid UserId in member PDU."))?; | ||||
| 
 | ||||
|                 match (since_membership, current_membership) { | ||||
|                     (MembershipState::Leave, MembershipState::Join) => { | ||||
|                         // A new user joined an encrypted room
 | ||||
|                         if !share_encrypted_room(&db, &sender_user, &user_id, &room_id) { | ||||
|                             device_list_updates.insert(user_id); | ||||
|                     match (since_membership, current_membership) { | ||||
|                         (MembershipState::Leave, MembershipState::Join) => { | ||||
|                             // A new user joined an encrypted room
 | ||||
|                             if !share_encrypted_room(&db, &sender_user, &user_id, &room_id) { | ||||
|                                 device_list_updates.insert(user_id); | ||||
|                             } | ||||
|                         } | ||||
|                         (MembershipState::Join, MembershipState::Leave) => { | ||||
|                             // Write down users that have left encrypted rooms we are in
 | ||||
|                             left_encrypted_users.insert(user_id); | ||||
|                         } | ||||
|                         _ => {} | ||||
|                     } | ||||
|                     (MembershipState::Join, MembershipState::Leave) => { | ||||
|                         // Write down users that have left encrypted rooms we are in
 | ||||
|                         left_encrypted_users.insert(user_id); | ||||
|                     } | ||||
|                     _ => {} | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let joined_since_last_sync = since_sender_member.map_or(false, |member| { | ||||
|             member.map_or(true, |member| member.membership != MembershipState::Join) | ||||
|         }); | ||||
|             let joined_since_last_sync = since_sender_member.map_or(false, |member| { | ||||
|                 member.map_or(true, |member| member.membership != MembershipState::Join) | ||||
|             }); | ||||
| 
 | ||||
|         if joined_since_last_sync && encrypted_room || new_encrypted_room { | ||||
|             // If the user is in a new encrypted room, give them all joined users
 | ||||
|             device_list_updates.extend( | ||||
|             if joined_since_last_sync && encrypted_room || new_encrypted_room { | ||||
|                 // If the user is in a new encrypted room, give them all joined users
 | ||||
|                 device_list_updates.extend( | ||||
|                     db.rooms | ||||
|                         .room_members(&room_id) | ||||
|                         .filter_map(|user_id| Some(user_id.ok()?)) | ||||
|                         .filter(|user_id| { | ||||
|                             // Don't send key updates from the sender to the sender
 | ||||
|                             sender_user != user_id | ||||
|                         }) | ||||
|                         .filter(|user_id| { | ||||
|                             // Only send keys if the sender doesn't share an encrypted room with the target already
 | ||||
|                             !share_encrypted_room(&db, sender_user, user_id, &room_id) | ||||
|                         }), | ||||
|                 ); | ||||
|             } | ||||
| 
 | ||||
|             let (joined_member_count, invited_member_count, heroes) = if send_member_count { | ||||
|                 let joined_member_count = db.rooms.room_members(&room_id).count(); | ||||
|                 let invited_member_count = db.rooms.room_members_invited(&room_id).count(); | ||||
| 
 | ||||
|                 // Recalculate heroes (first 5 members)
 | ||||
|                 let mut heroes = Vec::new(); | ||||
| 
 | ||||
|                 if joined_member_count + invited_member_count <= 5 { | ||||
|                     // Go through all PDUs and for each member event, check if the user is still joined or
 | ||||
|                     // invited until we have 5 or we reach the end
 | ||||
| 
 | ||||
|                     for hero in db | ||||
|                         .rooms | ||||
|                         .all_pdus(&sender_user, &room_id)? | ||||
|                         .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
 | ||||
|                         .filter(|(_, pdu)| pdu.kind == EventType::RoomMember) | ||||
|                         .map(|(_, pdu)| { | ||||
|                             let content = serde_json::from_value::< | ||||
|                                 Raw<ruma::events::room::member::MemberEventContent>, | ||||
|                             >(pdu.content.clone()) | ||||
|                             .expect("Raw::from_value always works") | ||||
|                             .deserialize() | ||||
|                             .map_err(|_| { | ||||
|                                 Error::bad_database("Invalid member event in database.") | ||||
|                             })?; | ||||
| 
 | ||||
|                             if let Some(state_key) = &pdu.state_key { | ||||
|                                 let user_id = | ||||
|                                     UserId::try_from(state_key.clone()).map_err(|_| { | ||||
|                                         Error::bad_database("Invalid UserId in member PDU.") | ||||
|                                     })?; | ||||
| 
 | ||||
|                                 // The membership was and still is invite or join
 | ||||
|                                 if matches!( | ||||
|                                     content.membership, | ||||
|                                     MembershipState::Join | MembershipState::Invite | ||||
|                                 ) && (db.rooms.is_joined(&user_id, &room_id)? | ||||
|                                     || db.rooms.is_invited(&user_id, &room_id)?) | ||||
|                                 { | ||||
|                                     Ok::<_, Error>(Some(state_key.clone())) | ||||
|                                 } else { | ||||
|                                     Ok(None) | ||||
|                                 } | ||||
|                             } else { | ||||
|                                 Ok(None) | ||||
|                             } | ||||
|                         }) | ||||
|                         .filter_map(|u| u.ok()) // Filter out buggy users
 | ||||
|                         // Filter for possible heroes
 | ||||
|                         .filter_map(|u| u) | ||||
|                     { | ||||
|                         if heroes.contains(&hero) || hero == sender_user.as_str() { | ||||
|                             continue; | ||||
|                         } | ||||
| 
 | ||||
|                         heroes.push(hero); | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 ( | ||||
|                     Some(joined_member_count), | ||||
|                     Some(invited_member_count), | ||||
|                     heroes, | ||||
|                 ) | ||||
|             } else { | ||||
|                 (None, None, Vec::new()) | ||||
|             }; | ||||
| 
 | ||||
|             let state_events = if joined_since_last_sync { | ||||
|                 db.rooms | ||||
|                     .room_members(&room_id) | ||||
|                     .filter_map(|user_id| Some(user_id.ok()?)) | ||||
|                     .filter(|user_id| { | ||||
|                         // Don't send key updates from the sender to the sender
 | ||||
|                         sender_user != user_id | ||||
|                     }) | ||||
|                     .filter(|user_id| { | ||||
|                         // Only send keys if the sender doesn't share an encrypted room with the target already
 | ||||
|                         !share_encrypted_room(&db, sender_user, user_id, &room_id) | ||||
|                     }), | ||||
|             ); | ||||
|         } | ||||
|                     .room_state_full(&room_id)? | ||||
|                     .into_iter() | ||||
|                     .map(|(_, pdu)| pdu.to_sync_state_event()) | ||||
|                     .collect() | ||||
|             } else { | ||||
|                 match since_state { | ||||
|                     None => Vec::new(), | ||||
|                     Some(Some(since_state)) => current_state | ||||
|                         .iter() | ||||
|                         .filter(|(key, value)| { | ||||
|                             since_state.get(key).map(|e| &e.event_id) != Some(&value.event_id) | ||||
|                         }) | ||||
|                         .filter(|(_, value)| { | ||||
|                             !timeline_pdus.iter().any(|(_, timeline_pdu)| { | ||||
|                                 timeline_pdu.kind == value.kind | ||||
|                                     && timeline_pdu.state_key == value.state_key | ||||
|                             }) | ||||
|                         }) | ||||
|                         .map(|(_, pdu)| pdu.to_sync_state_event()) | ||||
|                         .collect(), | ||||
|                     Some(None) => current_state | ||||
|                         .iter() | ||||
|                         .map(|(_, pdu)| pdu.to_sync_state_event()) | ||||
|                         .collect(), | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             ( | ||||
|                 heroes, | ||||
|                 joined_member_count, | ||||
|                 invited_member_count, | ||||
|                 joined_since_last_sync, | ||||
|                 state_events, | ||||
|             ) | ||||
|         } else { | ||||
|             (Vec::new(), None, None, false, Vec::new()) | ||||
|         }; | ||||
| 
 | ||||
|         // Look for device list updates in this room
 | ||||
|         device_list_updates.extend( | ||||
|  | @ -240,71 +356,6 @@ pub async fn sync_events_route( | |||
|                 .filter_map(|r| r.ok()), | ||||
|         ); | ||||
| 
 | ||||
|         let (joined_member_count, invited_member_count, heroes) = if send_member_count { | ||||
|             let joined_member_count = db.rooms.room_members(&room_id).count(); | ||||
|             let invited_member_count = db.rooms.room_members_invited(&room_id).count(); | ||||
| 
 | ||||
|             // Recalculate heroes (first 5 members)
 | ||||
|             let mut heroes = Vec::new(); | ||||
| 
 | ||||
|             if joined_member_count + invited_member_count <= 5 { | ||||
|                 // Go through all PDUs and for each member event, check if the user is still joined or
 | ||||
|                 // invited until we have 5 or we reach the end
 | ||||
| 
 | ||||
|                 for hero in db | ||||
|                     .rooms | ||||
|                     .all_pdus(&sender_user, &room_id)? | ||||
|                     .filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
 | ||||
|                     .filter(|(_, pdu)| pdu.kind == EventType::RoomMember) | ||||
|                     .map(|(_, pdu)| { | ||||
|                         let content = serde_json::from_value::< | ||||
|                             Raw<ruma::events::room::member::MemberEventContent>, | ||||
|                         >(pdu.content.clone()) | ||||
|                         .expect("Raw::from_value always works") | ||||
|                         .deserialize() | ||||
|                         .map_err(|_| Error::bad_database("Invalid member event in database."))?; | ||||
| 
 | ||||
|                         if let Some(state_key) = &pdu.state_key { | ||||
|                             let user_id = UserId::try_from(state_key.clone()).map_err(|_| { | ||||
|                                 Error::bad_database("Invalid UserId in member PDU.") | ||||
|                             })?; | ||||
| 
 | ||||
|                             // The membership was and still is invite or join
 | ||||
|                             if matches!( | ||||
|                                 content.membership, | ||||
|                                 MembershipState::Join | MembershipState::Invite | ||||
|                             ) && (db.rooms.is_joined(&user_id, &room_id)? | ||||
|                                 || db.rooms.is_invited(&user_id, &room_id)?) | ||||
|                             { | ||||
|                                 Ok::<_, Error>(Some(state_key.clone())) | ||||
|                             } else { | ||||
|                                 Ok(None) | ||||
|                             } | ||||
|                         } else { | ||||
|                             Ok(None) | ||||
|                         } | ||||
|                     }) | ||||
|                     .filter_map(|u| u.ok()) // Filter out buggy users
 | ||||
|                     // Filter for possible heroes
 | ||||
|                     .filter_map(|u| u) | ||||
|                 { | ||||
|                     if heroes.contains(&hero) || hero == sender_user.as_str() { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     heroes.push(hero); | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             ( | ||||
|                 Some(joined_member_count), | ||||
|                 Some(invited_member_count), | ||||
|                 heroes, | ||||
|             ) | ||||
|         } else { | ||||
|             (None, None, Vec::new()) | ||||
|         }; | ||||
| 
 | ||||
|         let notification_count = if send_notification_counts { | ||||
|             if let Some(last_read) = db.rooms.edus.private_read_get(&room_id, &sender_user)? { | ||||
|                 Some( | ||||
|  | @ -385,34 +436,7 @@ pub async fn sync_events_route( | |||
|                 events: room_events, | ||||
|             }, | ||||
|             state: sync_events::State { | ||||
|                 events: if joined_since_last_sync { | ||||
|                     db.rooms | ||||
|                         .room_state_full(&room_id)? | ||||
|                         .into_iter() | ||||
|                         .map(|(_, pdu)| pdu.to_sync_state_event()) | ||||
|                         .collect() | ||||
|                 } else { | ||||
|                     match since_state { | ||||
|                         None => Vec::new(), | ||||
|                         Some(Some(since_state)) => current_state | ||||
|                             .iter() | ||||
|                             .filter(|(key, value)| { | ||||
|                                 since_state.get(key).map(|e| &e.event_id) != Some(&value.event_id) | ||||
|                             }) | ||||
|                             .filter(|(_, value)| { | ||||
|                                 !timeline_pdus.iter().any(|(_, timeline_pdu)| { | ||||
|                                     timeline_pdu.kind == value.kind | ||||
|                                         && timeline_pdu.state_key == value.state_key | ||||
|                                 }) | ||||
|                             }) | ||||
|                             .map(|(_, pdu)| pdu.to_sync_state_event()) | ||||
|                             .collect(), | ||||
|                         Some(None) => current_state | ||||
|                             .iter() | ||||
|                             .map(|(_, pdu)| pdu.to_sync_state_event()) | ||||
|                             .collect(), | ||||
|                     } | ||||
|                 }, | ||||
|                 events: state_events, | ||||
|             }, | ||||
|             ephemeral: sync_events::Ephemeral { events: edus }, | ||||
|         }; | ||||
|  |  | |||
|  | @ -27,7 +27,7 @@ pub struct Config { | |||
|     server_name: Box<ServerName>, | ||||
|     database_path: String, | ||||
|     #[serde(default = "default_cache_capacity")] | ||||
|     cache_capacity: u64, | ||||
|     cache_capacity: u32, | ||||
|     #[serde(default = "default_max_request_size")] | ||||
|     max_request_size: u32, | ||||
|     #[serde(default = "default_max_concurrent_requests")] | ||||
|  | @ -48,7 +48,7 @@ fn true_fn() -> bool { | |||
|     true | ||||
| } | ||||
| 
 | ||||
| fn default_cache_capacity() -> u64 { | ||||
| fn default_cache_capacity() -> u32 { | ||||
|     1024 * 1024 * 1024 | ||||
| } | ||||
| 
 | ||||
|  | @ -93,8 +93,7 @@ impl Database { | |||
|     pub async fn load_or_create(config: Config) -> Result<Self> { | ||||
|         let db = sled::Config::default() | ||||
|             .path(&config.database_path) | ||||
|             .cache_capacity(config.cache_capacity) | ||||
|             .print_profile_on_drop(false) | ||||
|             .cache_capacity(config.cache_capacity as u64) | ||||
|             .open()?; | ||||
| 
 | ||||
|         info!("Opened sled database at {}", config.database_path); | ||||
|  |  | |||
|  | @ -1010,6 +1010,10 @@ impl Rooms { | |||
|                         .filter_map(|r| r.ok()) | ||||
|                         .any(|room_alias| aliases.is_match(room_alias.as_str())) | ||||
|                 }) || rooms.map_or(false, |rooms| rooms.contains(&room_id.as_str().into())) | ||||
|                     || self | ||||
|                         .room_members(&room_id) | ||||
|                         .filter_map(|r| r.ok()) | ||||
|                         .any(|member| users.iter().any(|regex| regex.is_match(member.as_str()))) | ||||
|                 { | ||||
|                     sending.send_pdu_appservice(&appservice.0, &pdu_id)?; | ||||
|                 } | ||||
|  |  | |||
|  | @ -8,7 +8,7 @@ use std::{ | |||
| 
 | ||||
| use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; | ||||
| use federation::transactions::send_transaction_message; | ||||
| use log::info; | ||||
| use log::{error, info}; | ||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||
| use ruma::{ | ||||
|     api::{appservice, federation, OutgoingRequest}, | ||||
|  | @ -131,6 +131,7 @@ impl Sending { | |||
|                                 }; | ||||
|                                 prefix.extend_from_slice(server.as_bytes()); | ||||
|                                 prefix.push(0xff); | ||||
| 
 | ||||
|                                 last_failed_try.insert(server.clone(), match last_failed_try.get(&server) { | ||||
|                                     Some(last_failed) => { | ||||
|                                         (last_failed.0+1, Instant::now()) | ||||
|  |  | |||
							
								
								
									
										32
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										32
									
								
								src/main.rs
									
									
									
									
									
								
							|  | @ -15,6 +15,7 @@ pub use database::Database; | |||
| pub use error::{ConduitLogger, Error, Result}; | ||||
| pub use pdu::PduEvent; | ||||
| pub use rocket::State; | ||||
| use ruma::api::client::error::ErrorKind; | ||||
| pub use ruma_wrapper::{ConduitResult, Ruma, RumaResponse}; | ||||
| 
 | ||||
| use log::LevelFilter; | ||||
|  | @ -154,7 +155,13 @@ fn setup_rocket() -> rocket::Rocket { | |||
|                 server_server::get_profile_information_route, | ||||
|             ], | ||||
|         ) | ||||
|         .register(catchers![not_found_catcher]) | ||||
|         .register(catchers![ | ||||
|             not_found_catcher, | ||||
|             forbidden_catcher, | ||||
|             unknown_token_catcher, | ||||
|             missing_token_catcher, | ||||
|             bad_json_catcher | ||||
|         ]) | ||||
|         .attach(AdHoc::on_attach("Config", |rocket| async { | ||||
|             let config = rocket | ||||
|                 .figment() | ||||
|  | @ -186,3 +193,26 @@ async fn main() { | |||
| fn not_found_catcher(_req: &'_ Request<'_>) -> String { | ||||
|     "404 Not Found".to_owned() | ||||
| } | ||||
| 
 | ||||
| #[catch(580)] | ||||
| fn forbidden_catcher() -> Result<()> { | ||||
|     Err(Error::BadRequest(ErrorKind::Forbidden, "Forbidden.")) | ||||
| } | ||||
| 
 | ||||
| #[catch(581)] | ||||
| fn unknown_token_catcher() -> Result<()> { | ||||
|     Err(Error::BadRequest( | ||||
|         ErrorKind::UnknownToken { soft_logout: false }, | ||||
|         "Unknown token.", | ||||
|     )) | ||||
| } | ||||
| 
 | ||||
| #[catch(582)] | ||||
| fn missing_token_catcher() -> Result<()> { | ||||
|     Err(Error::BadRequest(ErrorKind::MissingToken, "Missing token.")) | ||||
| } | ||||
| 
 | ||||
| #[catch(583)] | ||||
| fn bad_json_catcher() -> Result<()> { | ||||
|     Err(Error::BadRequest(ErrorKind::BadJson, "Bad json.")) | ||||
| } | ||||
|  |  | |||
|  | @ -45,7 +45,7 @@ where | |||
|         http::request::Request<std::vec::Vec<u8>>, | ||||
|     >>::Error: std::fmt::Debug, | ||||
| { | ||||
|     type Error = (); // TODO: Better error handling
 | ||||
|     type Error = (); | ||||
|     type Owned = Data; | ||||
|     type Borrowed = Self::Owned; | ||||
| 
 | ||||
|  | @ -82,7 +82,9 @@ where | |||
|                         registration | ||||
|                             .get("as_token") | ||||
|                             .and_then(|as_token| as_token.as_str()) | ||||
|                             .map_or(false, |as_token| token.as_deref() == Some(as_token)) | ||||
|                             .map_or(false, |as_token| { | ||||
|                                 dbg!(token.as_deref()) == dbg!(Some(as_token)) | ||||
|                             }) | ||||
|                     }) { | ||||
|                 match T::METADATA.authentication { | ||||
|                     AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => { | ||||
|  | @ -105,7 +107,8 @@ where | |||
|                         ); | ||||
| 
 | ||||
|                         if !db.users.exists(&user_id).unwrap() { | ||||
|                             return Failure((Status::Unauthorized, ())); | ||||
|                             // Forbidden
 | ||||
|                             return Failure((Status::raw(580), ())); | ||||
|                         } | ||||
| 
 | ||||
|                         // TODO: Check if appservice is allowed to be that user
 | ||||
|  | @ -119,15 +122,15 @@ where | |||
|                     AuthScheme::AccessToken | AuthScheme::QueryOnlyAccessToken => { | ||||
|                         if let Some(token) = token { | ||||
|                             match db.users.find_from_token(&token).unwrap() { | ||||
|                                 // TODO: M_UNKNOWN_TOKEN
 | ||||
|                                 None => return Failure((Status::Unauthorized, ())), | ||||
|                                 // Unknown Token
 | ||||
|                                 None => return Failure((Status::raw(581), ())), | ||||
|                                 Some((user_id, device_id)) => { | ||||
|                                     (Some(user_id), Some(device_id.into()), false) | ||||
|                                 } | ||||
|                             } | ||||
|                         } else { | ||||
|                             // TODO: M_MISSING_TOKEN
 | ||||
|                             return Failure((Status::Unauthorized, ())); | ||||
|                             // Missing Token
 | ||||
|                             return Failure((Status::raw(582), ())); | ||||
|                         } | ||||
|                     } | ||||
|                     AuthScheme::ServerSignatures => (None, None, false), | ||||
|  | @ -163,7 +166,7 @@ where | |||
|                 }), | ||||
|                 Err(e) => { | ||||
|                     warn!("{:?}", e); | ||||
|                     Failure((Status::BadRequest, ())) | ||||
|                     Failure((Status::raw(583), ())) | ||||
|                 } | ||||
|             } | ||||
|         }) | ||||
|  |  | |||
|  | @ -557,7 +557,7 @@ pub async fn send_transaction_message_route<'a>( | |||
|         pdu_id.push(0xff); | ||||
|         pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||
| 
 | ||||
|         db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||
|         let next_room_state = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||
| 
 | ||||
|         db.rooms.append_pdu( | ||||
|             &pdu, | ||||
|  | @ -569,6 +569,8 @@ pub async fn send_transaction_message_route<'a>( | |||
|             &db.admin, | ||||
|         )?; | ||||
| 
 | ||||
|         db.rooms.set_room_state(&room_id, &next_room_state)?; | ||||
| 
 | ||||
|         for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) { | ||||
|             db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?; | ||||
|         } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue