feat: make_join, send_join and /directory
This commit is contained in:
		
							parent
							
								
									04b08d4505
								
							
						
					
					
						commit
						eedac4fd96
					
				
					 4 changed files with 601 additions and 146 deletions
				
			
		|  | @ -564,7 +564,7 @@ async fn join_room_by_id_helper( | |||
|         pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||
| 
 | ||||
|         let pdu = PduEvent::from_id_val(&event_id, join_event.clone()) | ||||
|             .map_err(|_| Error::BadServerResponse("Invalid PDU in send_join response."))?; | ||||
|             .map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?; | ||||
| 
 | ||||
|         let mut state = BTreeMap::new(); | ||||
|         let pub_key_map = RwLock::new(BTreeMap::new()); | ||||
|  | @ -588,7 +588,7 @@ async fn join_room_by_id_helper( | |||
|                 Error::BadServerResponse("Invalid PDU in send_join response.") | ||||
|             })?; | ||||
| 
 | ||||
|             db.rooms.add_pdu_outlier(&pdu)?; | ||||
|             db.rooms.add_pdu_outlier(&event_id, &value)?; | ||||
|             if let Some(state_key) = &pdu.state_key { | ||||
|                 if pdu.kind == EventType::RoomMember { | ||||
|                     let target_user_id = UserId::try_from(state_key.clone()).map_err(|e| { | ||||
|  | @ -632,7 +632,11 @@ async fn join_room_by_id_helper( | |||
|             pdu.event_id.clone(), | ||||
|         ); | ||||
| 
 | ||||
|         db.rooms.force_state(room_id, state, &db.globals)?; | ||||
|         if state.get(&(EventType::RoomCreate, "".to_owned())).is_none() { | ||||
|             return Err(Error::BadServerResponse("State contained no create event.")); | ||||
|         } | ||||
| 
 | ||||
|         db.rooms.force_state(room_id, state, &db)?; | ||||
| 
 | ||||
|         for result in futures::future::join_all( | ||||
|             send_join_response | ||||
|  | @ -648,11 +652,7 @@ async fn join_room_by_id_helper( | |||
|                 Err(_) => continue, | ||||
|             }; | ||||
| 
 | ||||
|             let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { | ||||
|                 warn!("{:?}: {}", value, e); | ||||
|                 Error::BadServerResponse("Invalid PDU in send_join response.") | ||||
|             })?; | ||||
|             db.rooms.add_pdu_outlier(&pdu)?; | ||||
|             db.rooms.add_pdu_outlier(&event_id, &value)?; | ||||
|         } | ||||
| 
 | ||||
|         // We append to state before appending the pdu, so we don't have a moment in time with the
 | ||||
|  |  | |||
|  | @ -241,7 +241,7 @@ impl Rooms { | |||
|         kind: &EventType, | ||||
|         sender: &UserId, | ||||
|         state_key: Option<&str>, | ||||
|         content: serde_json::Value, | ||||
|         content: &serde_json::Value, | ||||
|     ) -> Result<StateMap<Arc<PduEvent>>> { | ||||
|         let auth_events = state_res::auth_types_for_event( | ||||
|             kind, | ||||
|  | @ -295,7 +295,7 @@ impl Rooms { | |||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|         state: BTreeMap<(EventType, String), EventId>, | ||||
|         globals: &super::globals::Globals, | ||||
|         db: &Database, | ||||
|     ) -> Result<()> { | ||||
|         let state_hash = self.calculate_hash( | ||||
|             &state | ||||
|  | @ -304,57 +304,109 @@ impl Rooms { | |||
|                 .collect::<Vec<_>>(), | ||||
|         ); | ||||
| 
 | ||||
|         let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { | ||||
|             Some(shortstatehash) => { | ||||
|                 // State already existed in db
 | ||||
|                 self.roomid_shortstatehash | ||||
|                     .insert(room_id.as_bytes(), &*shortstatehash)?; | ||||
|                 return Ok(()); | ||||
|             } | ||||
|             None => { | ||||
|                 let shortstatehash = globals.next_count()?; | ||||
|                 self.statehash_shortstatehash | ||||
|                     .insert(&state_hash, &shortstatehash.to_be_bytes())?; | ||||
|                 shortstatehash.to_be_bytes().to_vec() | ||||
|         let (shortstatehash, already_existed) = | ||||
|             match self.statehash_shortstatehash.get(&state_hash)? { | ||||
|                 Some(shortstatehash) => ( | ||||
|                     utils::u64_from_bytes(&shortstatehash) | ||||
|                         .map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?, | ||||
|                     true, | ||||
|                 ), | ||||
|                 None => { | ||||
|                     let shortstatehash = db.globals.next_count()?; | ||||
|                     self.statehash_shortstatehash | ||||
|                         .insert(&state_hash, &shortstatehash.to_be_bytes())?; | ||||
|                     (shortstatehash, false) | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|         let new_state = if !already_existed { | ||||
|             let mut new_state = HashSet::new(); | ||||
| 
 | ||||
|             for ((event_type, state_key), eventid) in state { | ||||
|                 new_state.insert(eventid.clone()); | ||||
| 
 | ||||
|                 let mut statekey = event_type.as_ref().as_bytes().to_vec(); | ||||
|                 statekey.push(0xff); | ||||
|                 statekey.extend_from_slice(&state_key.as_bytes()); | ||||
| 
 | ||||
|                 let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { | ||||
|                     Some(shortstatekey) => shortstatekey.to_vec(), | ||||
|                     None => { | ||||
|                         let shortstatekey = db.globals.next_count()?; | ||||
|                         self.statekey_shortstatekey | ||||
|                             .insert(&statekey, &shortstatekey.to_be_bytes())?; | ||||
|                         shortstatekey.to_be_bytes().to_vec() | ||||
|                     } | ||||
|                 }; | ||||
| 
 | ||||
|                 let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? { | ||||
|                     Some(shorteventid) => shorteventid.to_vec(), | ||||
|                     None => { | ||||
|                         let shorteventid = db.globals.next_count()?; | ||||
|                         self.eventid_shorteventid | ||||
|                             .insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?; | ||||
|                         self.shorteventid_eventid | ||||
|                             .insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?; | ||||
|                         shorteventid.to_be_bytes().to_vec() | ||||
|                     } | ||||
|                 }; | ||||
| 
 | ||||
|                 let mut state_id = shortstatehash.to_be_bytes().to_vec(); | ||||
|                 state_id.extend_from_slice(&shortstatekey); | ||||
| 
 | ||||
|                 self.stateid_shorteventid | ||||
|                     .insert(&state_id, &*shorteventid)?; | ||||
|             } | ||||
| 
 | ||||
|             new_state | ||||
|         } else { | ||||
|             self.state_full_ids(shortstatehash)?.into_iter().collect() | ||||
|         }; | ||||
| 
 | ||||
|         for ((event_type, state_key), eventid) in state { | ||||
|             let mut statekey = event_type.as_ref().as_bytes().to_vec(); | ||||
|             statekey.push(0xff); | ||||
|             statekey.extend_from_slice(&state_key.as_bytes()); | ||||
|         let old_state = self | ||||
|             .current_shortstatehash(&room_id)? | ||||
|             .map(|s| self.state_full_ids(s)) | ||||
|             .transpose()? | ||||
|             .map(|vec| vec.into_iter().collect::<HashSet<_>>()) | ||||
|             .unwrap_or_default(); | ||||
| 
 | ||||
|             let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { | ||||
|                 Some(shortstatekey) => shortstatekey.to_vec(), | ||||
|                 None => { | ||||
|                     let shortstatekey = globals.next_count()?; | ||||
|                     self.statekey_shortstatekey | ||||
|                         .insert(&statekey, &shortstatekey.to_be_bytes())?; | ||||
|                     shortstatekey.to_be_bytes().to_vec() | ||||
|         for event_id in new_state.difference(&old_state) { | ||||
|             if let Some(pdu) = self.get_pdu_json(event_id)? { | ||||
|                 if pdu.get("event_type") | ||||
|                     == Some(&CanonicalJsonValue::String("m.room.member".to_owned())) | ||||
|                 { | ||||
|                     if let Ok(pdu) = serde_json::from_value::<PduEvent>( | ||||
|                         serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"), | ||||
|                     ) { | ||||
|                         if let Some(membership) = | ||||
|                             pdu.content.get("membership").and_then(|membership| { | ||||
|                                 serde_json::from_value::<member::MembershipState>( | ||||
|                                     membership.clone(), | ||||
|                                 ) | ||||
|                                 .ok() | ||||
|                             }) | ||||
|                         { | ||||
|                             if let Some(state_key) = pdu | ||||
|                                 .state_key | ||||
|                                 .and_then(|state_key| UserId::try_from(state_key).ok()) | ||||
|                             { | ||||
|                                 self.update_membership( | ||||
|                                     room_id, | ||||
|                                     &state_key, | ||||
|                                     membership, | ||||
|                                     &pdu.sender, | ||||
|                                     None, | ||||
|                                     db, | ||||
|                                 )?; | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? { | ||||
|                 Some(shorteventid) => shorteventid.to_vec(), | ||||
|                 None => { | ||||
|                     let shorteventid = globals.next_count()?; | ||||
|                     self.eventid_shorteventid | ||||
|                         .insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?; | ||||
|                     self.shorteventid_eventid | ||||
|                         .insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?; | ||||
|                     shorteventid.to_be_bytes().to_vec() | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             let mut state_id = shortstatehash.clone(); | ||||
|             state_id.extend_from_slice(&shortstatekey); | ||||
| 
 | ||||
|             self.stateid_shorteventid | ||||
|                 .insert(&*state_id, &*shorteventid)?; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         self.roomid_shortstatehash | ||||
|             .insert(room_id.as_bytes(), &*shortstatehash)?; | ||||
|             .insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
|  | @ -591,10 +643,10 @@ impl Rooms { | |||
|     /// Append the PDU as an outlier.
 | ||||
|     ///
 | ||||
|     /// Any event given to this will be processed (state-res) on another thread.
 | ||||
|     pub fn add_pdu_outlier(&self, pdu: &PduEvent) -> Result<()> { | ||||
|     pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> { | ||||
|         self.eventid_outlierpdu.insert( | ||||
|             &pdu.event_id.as_bytes(), | ||||
|             &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), | ||||
|             &event_id.as_bytes(), | ||||
|             &*serde_json::to_string(&pdu).expect("CanonicalJsonObject is valid string"), | ||||
|         )?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|  | @ -1193,7 +1245,7 @@ impl Rooms { | |||
|             &event_type, | ||||
|             &sender, | ||||
|             state_key.as_deref(), | ||||
|             content.clone(), | ||||
|             &content, | ||||
|         )?; | ||||
| 
 | ||||
|         // Our depth is the maximum depth of prev_events + 1
 | ||||
|  |  | |||
|  | @ -168,7 +168,10 @@ fn setup_rocket() -> (rocket::Rocket, Config) { | |||
|                 server_server::get_event_route, | ||||
|                 server_server::get_missing_events_route, | ||||
|                 server_server::get_room_state_ids_route, | ||||
|                 server_server::create_join_event_template_route, | ||||
|                 server_server::create_join_event_route, | ||||
|                 server_server::create_invite_route, | ||||
|                 server_server::get_room_information_route, | ||||
|                 server_server::get_profile_information_route, | ||||
|             ], | ||||
|         ) | ||||
|  |  | |||
|  | @ -14,25 +14,32 @@ use ruma::{ | |||
|                 VerifyKey, | ||||
|             }, | ||||
|             event::{get_event, get_missing_events, get_room_state_ids}, | ||||
|             membership::create_invite, | ||||
|             query::get_profile_information, | ||||
|             membership::{ | ||||
|                 create_invite, | ||||
|                 create_join_event::{self, RoomState}, | ||||
|                 create_join_event_template, | ||||
|             }, | ||||
|             query::{get_profile_information, get_room_information}, | ||||
|             transactions::send_transaction_message, | ||||
|         }, | ||||
|         IncomingResponse, OutgoingRequest, OutgoingResponse, | ||||
|     }, | ||||
|     directory::{IncomingFilter, IncomingRoomNetwork}, | ||||
|     events::{ | ||||
|         room::{create::CreateEventContent, member::MembershipState}, | ||||
|         room::{ | ||||
|             create::CreateEventContent, | ||||
|             member::{MemberEventContent, MembershipState}, | ||||
|         }, | ||||
|         EventType, | ||||
|     }, | ||||
|     serde::{to_canonical_value, Raw}, | ||||
|     signatures::CanonicalJsonValue, | ||||
|     EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, | ||||
|     signatures::{CanonicalJsonObject, CanonicalJsonValue}, | ||||
|     uint, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, | ||||
| }; | ||||
| use state_res::{Event, EventMap, StateMap}; | ||||
| use std::{ | ||||
|     collections::{btree_map::Entry, BTreeMap, BTreeSet, HashSet}, | ||||
|     convert::TryFrom, | ||||
|     convert::{TryFrom, TryInto}, | ||||
|     fmt::Debug, | ||||
|     future::Future, | ||||
|     net::{IpAddr, SocketAddr}, | ||||
|  | @ -589,8 +596,8 @@ pub async fn send_transaction_message_route<'a>( | |||
|     Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) | ||||
| } | ||||
| 
 | ||||
| /// An async function that can recursively calls itself.
 | ||||
| type AsyncRecursiveResult<'a, T> = Pin<Box<dyn Future<Output = StdResult<T, String>> + 'a + Send>>; | ||||
| /// An async function that can recursively call itself.
 | ||||
| type AsyncRecursiveResult<'a, T, E> = Pin<Box<dyn Future<Output = StdResult<T, E>> + 'a + Send>>; | ||||
| 
 | ||||
| /// When receiving an event one needs to:
 | ||||
| /// 0. Skip the PDU if we already know about it
 | ||||
|  | @ -624,13 +631,13 @@ fn handle_incoming_pdu<'a>( | |||
|     db: &'a Database, | ||||
|     pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, | ||||
|     auth_cache: &'a mut EventMap<Arc<PduEvent>>, | ||||
| ) -> AsyncRecursiveResult<'a, Arc<PduEvent>> { | ||||
| ) -> AsyncRecursiveResult<'a, Option<Vec<u8>>, String> { | ||||
|     Box::pin(async move { | ||||
|         // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
 | ||||
| 
 | ||||
|         // 0. Skip the PDU if we already know about it
 | ||||
|         if let Ok(Some(pdu)) = db.rooms.get_non_outlier_pdu(&event_id) { | ||||
|             return Ok(Arc::new(pdu)); | ||||
|         // 0. Skip the PDU if we already have it as a timeline event
 | ||||
|         if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(&event_id) { | ||||
|             return Ok(Some(pdu_id.to_vec())); | ||||
|         } | ||||
| 
 | ||||
|         // 1. Check the server is in the room
 | ||||
|  | @ -690,6 +697,7 @@ fn handle_incoming_pdu<'a>( | |||
|             } | ||||
|             Ok(ruma::signatures::Verified::Signatures) => { | ||||
|                 // Redact
 | ||||
|                 warn!("Calculated hash does not match: {}", event_id); | ||||
|                 match ruma::signatures::redact(&value, &room_version) { | ||||
|                     Ok(obj) => obj, | ||||
|                     Err(_) => return Err("Redaction failed".to_string()), | ||||
|  | @ -705,7 +713,7 @@ fn handle_incoming_pdu<'a>( | |||
|             to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"), | ||||
|         ); | ||||
|         let incoming_pdu = serde_json::from_value::<PduEvent>( | ||||
|             serde_json::to_value(val).expect("CanonicalJsonObj is a valid JsonValue"), | ||||
|             serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"), | ||||
|         ) | ||||
|         .map_err(|_| "Event is not a valid PDU.".to_string())?; | ||||
| 
 | ||||
|  | @ -793,13 +801,13 @@ fn handle_incoming_pdu<'a>( | |||
| 
 | ||||
|         // 7. Persist the event as an outlier.
 | ||||
|         db.rooms | ||||
|             .add_pdu_outlier(&incoming_pdu) | ||||
|             .add_pdu_outlier(&incoming_pdu.event_id, &val) | ||||
|             .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; | ||||
|         debug!("Added pdu as outlier."); | ||||
| 
 | ||||
|         // 8. if not timeline event: stop
 | ||||
|         if !is_timeline_event { | ||||
|             return Ok(incoming_pdu); | ||||
|             return Ok(None); | ||||
|         } | ||||
| 
 | ||||
|         // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
 | ||||
|  | @ -811,7 +819,54 @@ fn handle_incoming_pdu<'a>( | |||
|         // the state from a known point and resolve if > 1 prev_event
 | ||||
| 
 | ||||
|         debug!("Requesting state at event."); | ||||
|         let (state_at_incoming_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) = | ||||
|         let mut state_at_incoming_event = None; | ||||
|         let mut incoming_auth_events = Vec::new(); | ||||
| 
 | ||||
|         if incoming_pdu.prev_events.len() == 1 { | ||||
|             let prev_event = &incoming_pdu.prev_events[0]; | ||||
|             let state_vec = db | ||||
|                 .rooms | ||||
|                 .pdu_shortstatehash(prev_event) | ||||
|                 .map_err(|_| "Failed talking to db".to_owned())? | ||||
|                 .map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok()) | ||||
|                 .flatten(); | ||||
|             if let Some(mut state_vec) = state_vec { | ||||
|                 if db | ||||
|                     .rooms | ||||
|                     .get_pdu(prev_event) | ||||
|                     .ok() | ||||
|                     .flatten() | ||||
|                     .ok_or_else(|| "Could not find prev event, but we know the state.".to_owned())? | ||||
|                     .state_key | ||||
|                     .is_some() | ||||
|                 { | ||||
|                     state_vec.push(prev_event.clone()); | ||||
|                 } | ||||
|                 state_at_incoming_event = Some( | ||||
|                     fetch_and_handle_events(db, origin, &state_vec, pub_key_map, auth_cache) | ||||
|                         .await | ||||
|                         .map_err(|_| "Failed to fetch state events locally".to_owned())? | ||||
|                         .into_iter() | ||||
|                         .map(|pdu| { | ||||
|                             ( | ||||
|                                 ( | ||||
|                                     pdu.kind.clone(), | ||||
|                                     pdu.state_key | ||||
|                                         .clone() | ||||
|                                         .expect("events from state_full_ids are state events"), | ||||
|                                 ), | ||||
|                                 pdu, | ||||
|                             ) | ||||
|                         }) | ||||
|                         .collect(), | ||||
|                 ); | ||||
|             } | ||||
|             &state_at_incoming_event; | ||||
| 
 | ||||
|             // TODO: set incoming_auth_events?
 | ||||
|         } | ||||
| 
 | ||||
|         if state_at_incoming_event.is_none() { | ||||
|             // Call /state_ids to find out what the state at this pdu is. We trust the server's
 | ||||
|             // response to some extend, but we still do a lot of checks on the events
 | ||||
|             match db | ||||
|  | @ -856,12 +911,16 @@ fn handle_incoming_pdu<'a>( | |||
|                     } | ||||
| 
 | ||||
|                     // The original create event must still be in the state
 | ||||
|                     if state.get(&(EventType::RoomCreate, "".to_owned())).map(|a| a.as_ref()) != Some(&create_event) { | ||||
|                     if state | ||||
|                         .get(&(EventType::RoomCreate, "".to_owned())) | ||||
|                         .map(|a| a.as_ref()) | ||||
|                         != Some(&create_event) | ||||
|                     { | ||||
|                         return Err("Incoming event refers to wrong create event.".to_owned()); | ||||
|                     } | ||||
| 
 | ||||
|                     debug!("Fetching auth chain events at event."); | ||||
|                     let incoming_auth_events = match fetch_and_handle_events( | ||||
|                     incoming_auth_events = match fetch_and_handle_events( | ||||
|                         &db, | ||||
|                         origin, | ||||
|                         &res.auth_chain_ids, | ||||
|  | @ -874,12 +933,16 @@ fn handle_incoming_pdu<'a>( | |||
|                         Err(_) => return Err("Failed to fetch auth chain.".to_owned()), | ||||
|                     }; | ||||
| 
 | ||||
|                     (state, incoming_auth_events) | ||||
|                     state_at_incoming_event = Some(state); | ||||
|                 } | ||||
|                 Err(_) => { | ||||
|                     return Err("Fetching state for event failed".into()); | ||||
|                 } | ||||
|             }; | ||||
|         } | ||||
| 
 | ||||
|         let state_at_incoming_event = | ||||
|             state_at_incoming_event.expect("we always set this to some above"); | ||||
| 
 | ||||
|         // 11. Check the auth of the event passes based on the state of the event
 | ||||
|         if !state_res::event_auth::auth_check( | ||||
|  | @ -1079,20 +1142,26 @@ fn handle_incoming_pdu<'a>( | |||
|         // Now that the event has passed all auth it is added into the timeline.
 | ||||
|         // We use the `state_at_event` instead of `state_after` so we accurately
 | ||||
|         // represent the state for this event.
 | ||||
|         append_incoming_pdu(&db, &incoming_pdu, extremities, &state_at_incoming_event) | ||||
|             .map_err(|_| "Failed to add pdu to db.".to_owned())?; | ||||
|         let pdu_id = append_incoming_pdu( | ||||
|             &db, | ||||
|             &incoming_pdu, | ||||
|             val, | ||||
|             extremities, | ||||
|             &state_at_incoming_event, | ||||
|         ) | ||||
|         .map_err(|_| "Failed to add pdu to db.".to_owned())?; | ||||
|         debug!("Appended incoming pdu."); | ||||
| 
 | ||||
|         // Set the new room state to the resolved state
 | ||||
|         if update_state { | ||||
|             db.rooms | ||||
|                 .force_state(&room_id, new_room_state, &db.globals) | ||||
|                 .force_state(&room_id, new_room_state, &db) | ||||
|                 .map_err(|_| "Failed to set new room state.".to_owned())?; | ||||
|         } | ||||
|         debug!("Updated resolved state"); | ||||
| 
 | ||||
|         // Event has passed all auth/stateres checks
 | ||||
|         Ok(incoming_pdu) | ||||
|         Ok(Some(pdu_id)) | ||||
|     }) | ||||
| } | ||||
| 
 | ||||
|  | @ -1108,77 +1177,93 @@ fn handle_incoming_pdu<'a>( | |||
| /// If the event is unknown to the `auth_cache` it is added. This guarantees that any
 | ||||
| /// event we need to know of will be present.
 | ||||
| //#[tracing::instrument(skip(db, key_map, auth_cache))]
 | ||||
| pub(crate) async fn fetch_and_handle_events( | ||||
|     db: &Database, | ||||
|     origin: &ServerName, | ||||
|     events: &[EventId], | ||||
|     pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>, | ||||
|     auth_cache: &mut EventMap<Arc<PduEvent>>, | ||||
| ) -> Result<Vec<Arc<PduEvent>>> { | ||||
|     let mut pdus = vec![]; | ||||
|     for id in events { | ||||
|         // a. Look at auth cache
 | ||||
|         let pdu = match auth_cache.get(id) { | ||||
|             Some(pdu) => { | ||||
|                 debug!("Found {} in cache", id); | ||||
|                 pdu.clone() | ||||
|             } | ||||
|             // b. Look in the main timeline (pduid_pdu tree)
 | ||||
|             // c. Look at outlier pdu tree
 | ||||
|             // (get_pdu checks both)
 | ||||
|             None => match db.rooms.get_pdu(&id)? { | ||||
|                 Some(pdu) => { | ||||
|                     debug!("Found {} in outliers", id); | ||||
|                     Arc::new(pdu) | ||||
|                 } | ||||
|                 None => { | ||||
|                     // d. Ask origin server over federation
 | ||||
|                     debug!("Fetching {} over federation.", id); | ||||
|                     match db | ||||
|                         .sending | ||||
|                         .send_federation_request( | ||||
|                             &db.globals, | ||||
|                             origin, | ||||
|                             get_event::v1::Request { event_id: &id }, | ||||
|                         ) | ||||
|                         .await | ||||
|                     { | ||||
|                         Ok(res) => { | ||||
|                             debug!("Got {} over federation: {:?}", id, res); | ||||
|                             let (event_id, value) = | ||||
|                                 crate::pdu::gen_event_id_canonical_json(&res.pdu)?; | ||||
|                             let pdu = match handle_incoming_pdu( | ||||
|                                 origin, | ||||
|                                 &event_id, | ||||
|                                 value, | ||||
|                                 false, | ||||
| pub(crate) fn fetch_and_handle_events<'a>( | ||||
|     db: &'a Database, | ||||
|     origin: &'a ServerName, | ||||
|     events: &'a [EventId], | ||||
|     pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>, | ||||
|     auth_cache: &'a mut EventMap<Arc<PduEvent>>, | ||||
| ) -> AsyncRecursiveResult<'a, Vec<Arc<PduEvent>>, Error> { | ||||
|     Box::pin(async move { | ||||
|         let mut pdus = vec![]; | ||||
|         for id in events { | ||||
|             // a. Look at auth cache
 | ||||
|             let pdu = | ||||
|                 match auth_cache.get(id) { | ||||
|                     Some(pdu) => { | ||||
|                         debug!("Found {} in cache", id); | ||||
|                         // We already have the auth chain for events in cache
 | ||||
|                         pdu.clone() | ||||
|                     } | ||||
|                     // b. Look in the main timeline (pduid_pdu tree)
 | ||||
|                     // c. Look at outlier pdu tree
 | ||||
|                     // (get_pdu checks both)
 | ||||
|                     None => match db.rooms.get_pdu(&id)? { | ||||
|                         Some(pdu) => { | ||||
|                             debug!("Found {} in db", id); | ||||
|                             // We need to fetch the auth chain
 | ||||
|                             let _ = fetch_and_handle_events( | ||||
|                                 db, | ||||
|                                 origin, | ||||
|                                 &pdu.auth_events, | ||||
|                                 pub_key_map, | ||||
|                                 auth_cache, | ||||
|                             ) | ||||
|                             .await | ||||
|                             .await?; | ||||
|                             Arc::new(pdu) | ||||
|                         } | ||||
|                         None => { | ||||
|                             // d. Ask origin server over federation
 | ||||
|                             debug!("Fetching {} over federation.", id); | ||||
|                             match db | ||||
|                                 .sending | ||||
|                                 .send_federation_request( | ||||
|                                     &db.globals, | ||||
|                                     origin, | ||||
|                                     get_event::v1::Request { event_id: &id }, | ||||
|                                 ) | ||||
|                                 .await | ||||
|                             { | ||||
|                                 Ok(pdu) => pdu, | ||||
|                                 Err(e) => { | ||||
|                                     warn!("Authentication of event {} failed: {:?}", id, e); | ||||
|                                 Ok(res) => { | ||||
|                                     debug!("Got {} over federation: {:?}", id, res); | ||||
|                                     let (event_id, value) = | ||||
|                                         crate::pdu::gen_event_id_canonical_json(&res.pdu)?; | ||||
|                                     // This will also fetch the auth chain
 | ||||
|                                     match handle_incoming_pdu( | ||||
|                                     origin, | ||||
|                                     &event_id, | ||||
|                                     value.clone(), | ||||
|                                     false, | ||||
|                                     db, | ||||
|                                     pub_key_map, | ||||
|                                     auth_cache, | ||||
|                                 ) | ||||
|                                 .await | ||||
|                                 { | ||||
|                                     Ok(_) => Arc::new(serde_json::from_value( | ||||
|                                         serde_json::to_value(value) | ||||
|                                             .expect("canonicaljsonobject is valid value"), | ||||
|                                     ) | ||||
|                                     .expect("This is possible because handle_incoming_pdu worked")), | ||||
|                                     Err(e) => { | ||||
|                                         warn!("Authentication of event {} failed: {:?}", id, e); | ||||
|                                         continue; | ||||
|                                     } | ||||
|                                 } | ||||
|                                 } | ||||
|                                 Err(_) => { | ||||
|                                     warn!("Failed to fetch event: {}", id); | ||||
|                                     continue; | ||||
|                                 } | ||||
|                             }; | ||||
| 
 | ||||
|                             pdu | ||||
|                             } | ||||
|                         } | ||||
|                         Err(_) => { | ||||
|                             warn!("Failed to fetch event: {}", id); | ||||
|                             continue; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             }, | ||||
|         }; | ||||
|         auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone()); | ||||
|         pdus.push(pdu); | ||||
|     } | ||||
|     Ok(pdus) | ||||
|                     }, | ||||
|                 }; | ||||
|             auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone()); | ||||
|             pdus.push(pdu); | ||||
|         } | ||||
|         Ok(pdus) | ||||
|     }) | ||||
| } | ||||
| 
 | ||||
| /// Search the DB for the signing keys of the given server, if we don't have them
 | ||||
|  | @ -1280,9 +1365,10 @@ pub(crate) async fn fetch_signing_keys( | |||
| pub(crate) fn append_incoming_pdu( | ||||
|     db: &Database, | ||||
|     pdu: &PduEvent, | ||||
|     pdu_json: CanonicalJsonObject, | ||||
|     new_room_leaves: HashSet<EventId>, | ||||
|     state: &StateMap<Arc<PduEvent>>, | ||||
| ) -> Result<()> { | ||||
| ) -> Result<Vec<u8>> { | ||||
|     let count = db.globals.next_count()?; | ||||
|     let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||
|     pdu_id.push(0xff); | ||||
|  | @ -1295,7 +1381,7 @@ pub(crate) fn append_incoming_pdu( | |||
| 
 | ||||
|     db.rooms.append_pdu( | ||||
|         pdu, | ||||
|         utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), | ||||
|         pdu_json, | ||||
|         count, | ||||
|         pdu_id.clone().into(), | ||||
|         &new_room_leaves.into_iter().collect::<Vec<_>>(), | ||||
|  | @ -1366,7 +1452,7 @@ pub(crate) fn append_incoming_pdu( | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
|     Ok(pdu_id) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|  | @ -1495,6 +1581,291 @@ pub fn get_room_state_ids_route<'a>( | |||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     get("/_matrix/federation/v1/make_join/<_>/<_>", data = "<body>") | ||||
| )] | ||||
| #[tracing::instrument(skip(db, body))] | ||||
| pub fn create_join_event_template_route<'a>( | ||||
|     db: State<'a, Database>, | ||||
|     body: Ruma<create_join_event_template::v1::Request<'_>>, | ||||
| ) -> ConduitResult<create_join_event_template::v1::Response> { | ||||
|     if !db.globals.allow_federation() { | ||||
|         return Err(Error::bad_config("Federation is disabled.")); | ||||
|     } | ||||
| 
 | ||||
|     if !db.rooms.exists(&body.room_id)? { | ||||
|         return Err(Error::BadRequest( | ||||
|             ErrorKind::NotFound, | ||||
|             "Server is not in room.", | ||||
|         )); | ||||
|     } | ||||
| 
 | ||||
|     if !body.ver.contains(&RoomVersionId::Version6) { | ||||
|         return Err(Error::BadRequest( | ||||
|             ErrorKind::IncompatibleRoomVersion { | ||||
|                 room_version: RoomVersionId::Version6, | ||||
|             }, | ||||
|             "Room version not supported.", | ||||
|         )); | ||||
|     } | ||||
| 
 | ||||
|     let prev_events = db | ||||
|         .rooms | ||||
|         .get_pdu_leaves(&body.room_id)? | ||||
|         .into_iter() | ||||
|         .take(20) | ||||
|         .collect::<Vec<_>>(); | ||||
| 
 | ||||
|     let create_event = db | ||||
|         .rooms | ||||
|         .room_state_get(&body.room_id, &EventType::RoomCreate, "")?; | ||||
| 
 | ||||
|     let create_event_content = create_event | ||||
|         .as_ref() | ||||
|         .map(|create_event| { | ||||
|             Ok::<_, Error>( | ||||
|                 serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) | ||||
|                     .expect("Raw::from_value always works.") | ||||
|                     .deserialize() | ||||
|                     .map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?, | ||||
|             ) | ||||
|         }) | ||||
|         .transpose()?; | ||||
| 
 | ||||
|     let create_prev_event = if prev_events.len() == 1 | ||||
|         && Some(&prev_events[0]) == create_event.as_ref().map(|c| &c.event_id) | ||||
|     { | ||||
|         create_event.map(Arc::new) | ||||
|     } else { | ||||
|         None | ||||
|     }; | ||||
| 
 | ||||
|     // If there was no create event yet, assume we are creating a version 6 room right now
 | ||||
|     let room_version = create_event_content.map_or(RoomVersionId::Version6, |create_event| { | ||||
|         create_event.room_version | ||||
|     }); | ||||
| 
 | ||||
|     let content = serde_json::to_value(MemberEventContent { | ||||
|         avatar_url: None, | ||||
|         displayname: None, | ||||
|         is_direct: None, | ||||
|         membership: MembershipState::Join, | ||||
|         third_party_invite: None, | ||||
|     }) | ||||
|     .expect("member event is valid value"); | ||||
| 
 | ||||
|     let state_key = body.user_id.to_string(); | ||||
|     let kind = EventType::RoomMember; | ||||
| 
 | ||||
|     let auth_events = db.rooms.get_auth_events( | ||||
|         &body.room_id, | ||||
|         &kind, | ||||
|         &body.user_id, | ||||
|         Some(&state_key), | ||||
|         &content, | ||||
|     )?; | ||||
| 
 | ||||
|     // Our depth is the maximum depth of prev_events + 1
 | ||||
|     let depth = prev_events | ||||
|         .iter() | ||||
|         .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) | ||||
|         .max() | ||||
|         .unwrap_or_else(|| uint!(0)) | ||||
|         + uint!(1); | ||||
| 
 | ||||
|     let mut unsigned = BTreeMap::new(); | ||||
| 
 | ||||
|     if let Some(prev_pdu) = db.rooms.room_state_get(&body.room_id, &kind, &state_key)? { | ||||
|         unsigned.insert("prev_content".to_owned(), prev_pdu.content); | ||||
|         unsigned.insert( | ||||
|             "prev_sender".to_owned(), | ||||
|             serde_json::to_value(prev_pdu.sender).expect("UserId::to_value always works"), | ||||
|         ); | ||||
|     } | ||||
| 
 | ||||
|     let pdu = PduEvent { | ||||
|         event_id: ruma::event_id!("$thiswillbefilledinlater"), | ||||
|         room_id: body.room_id.clone(), | ||||
|         sender: body.user_id.clone(), | ||||
|         origin_server_ts: utils::millis_since_unix_epoch() | ||||
|             .try_into() | ||||
|             .expect("time is valid"), | ||||
|         kind, | ||||
|         content, | ||||
|         state_key: Some(state_key), | ||||
|         prev_events, | ||||
|         depth, | ||||
|         auth_events: auth_events | ||||
|             .iter() | ||||
|             .map(|(_, pdu)| pdu.event_id.clone()) | ||||
|             .collect(), | ||||
|         redacts: None, | ||||
|         unsigned, | ||||
|         hashes: ruma::events::pdu::EventHash { | ||||
|             sha256: "aaa".to_owned(), | ||||
|         }, | ||||
|         signatures: BTreeMap::new(), | ||||
|     }; | ||||
| 
 | ||||
|     let auth_check = state_res::auth_check( | ||||
|         &room_version, | ||||
|         &Arc::new(pdu.clone()), | ||||
|         create_prev_event, | ||||
|         &auth_events, | ||||
|         None, // TODO: third_party_invite
 | ||||
|     ) | ||||
|     .map_err(|e| { | ||||
|         error!("{:?}", e); | ||||
|         Error::bad_database("Auth check failed.") | ||||
|     })?; | ||||
| 
 | ||||
|     if !auth_check { | ||||
|         return Err(Error::BadRequest( | ||||
|             ErrorKind::InvalidParam, | ||||
|             "Event is not authorized.", | ||||
|         )); | ||||
|     } | ||||
| 
 | ||||
|     // Hash and sign
 | ||||
|     let mut pdu_json = | ||||
|         utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); | ||||
| 
 | ||||
|     pdu_json.remove("event_id"); | ||||
| 
 | ||||
|     // Add origin because synapse likes that (and it's required in the spec)
 | ||||
|     pdu_json.insert( | ||||
|         "origin".to_owned(), | ||||
|         to_canonical_value(db.globals.server_name()) | ||||
|             .expect("server name is a valid CanonicalJsonValue"), | ||||
|     ); | ||||
| 
 | ||||
|     Ok(create_join_event_template::v1::Response { | ||||
|         room_version: Some(RoomVersionId::Version6), | ||||
|         event: serde_json::from_value::<Raw<_>>( | ||||
|             serde_json::to_value(pdu_json).expect("CanonicalJson is valid serde_json::Value"), | ||||
|         ) | ||||
|         .expect("Raw::from_value always works"), | ||||
|     } | ||||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     put("/_matrix/federation/v2/send_join/<_>/<_>", data = "<body>") | ||||
| )] | ||||
| #[tracing::instrument(skip(db, body))] | ||||
| pub async fn create_join_event_route<'a>( | ||||
|     db: State<'a, Database>, | ||||
|     body: Ruma<create_join_event::v2::Request<'_>>, | ||||
| ) -> ConduitResult<create_join_event::v2::Response> { | ||||
|     if !db.globals.allow_federation() { | ||||
|         return Err(Error::bad_config("Federation is disabled.")); | ||||
|     } | ||||
| 
 | ||||
|     // We need to return the state prior to joining, let's keep a reference to that here
 | ||||
|     let shortstatehash = | ||||
|         db.rooms | ||||
|             .current_shortstatehash(&body.room_id)? | ||||
|             .ok_or(Error::BadRequest( | ||||
|                 ErrorKind::NotFound, | ||||
|                 "Pdu state not found.", | ||||
|             ))?; | ||||
| 
 | ||||
|     let pub_key_map = RwLock::new(BTreeMap::new()); | ||||
|     let mut auth_cache = EventMap::new(); | ||||
| 
 | ||||
|     // We do not add the event_id field to the pdu here because of signature and hashes checks
 | ||||
|     let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&body.pdu) { | ||||
|         Ok(t) => t, | ||||
|         Err(_) => { | ||||
|             // Event could not be converted to canonical json
 | ||||
|             return Err(Error::BadRequest( | ||||
|                 ErrorKind::InvalidParam, | ||||
|                 "Could not convert event to canonical json.", | ||||
|             )); | ||||
|         } | ||||
|     }; | ||||
| 
 | ||||
|     let origin = serde_json::from_value::<Box<ServerName>>( | ||||
|         serde_json::to_value(value.get("origin").ok_or(Error::BadRequest( | ||||
|             ErrorKind::InvalidParam, | ||||
|             "Event needs an origin field.", | ||||
|         ))?) | ||||
|         .expect("CanonicalJson is valid json value"), | ||||
|     ) | ||||
|     .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; | ||||
| 
 | ||||
|     let pdu_id = handle_incoming_pdu( | ||||
|         &origin, | ||||
|         &event_id, | ||||
|         value, | ||||
|         true, | ||||
|         &db, | ||||
|         &pub_key_map, | ||||
|         &mut auth_cache, | ||||
|     ) | ||||
|     .await | ||||
|     .map_err(|_| { | ||||
|         Error::BadRequest( | ||||
|             ErrorKind::InvalidParam, | ||||
|             "Error while handling incoming PDU.", | ||||
|         ) | ||||
|     })? | ||||
|     .ok_or(Error::BadRequest( | ||||
|         ErrorKind::InvalidParam, | ||||
|         "Could not accept incoming PDU as timeline event.", | ||||
|     ))?; | ||||
| 
 | ||||
|     let state_ids = db.rooms.state_full_ids(shortstatehash)?; | ||||
| 
 | ||||
|     let mut auth_chain_ids = BTreeSet::<EventId>::new(); | ||||
|     let mut todo = state_ids.iter().cloned().collect::<BTreeSet<_>>(); | ||||
| 
 | ||||
|     while let Some(event_id) = todo.iter().next().cloned() { | ||||
|         if let Some(pdu) = db.rooms.get_pdu(&event_id)? { | ||||
|             todo.extend( | ||||
|                 pdu.auth_events | ||||
|                     .clone() | ||||
|                     .into_iter() | ||||
|                     .collect::<BTreeSet<_>>() | ||||
|                     .difference(&auth_chain_ids) | ||||
|                     .cloned(), | ||||
|             ); | ||||
|             auth_chain_ids.extend(pdu.auth_events.into_iter()); | ||||
|         } else { | ||||
|             warn!("Could not find pdu mentioned in auth events."); | ||||
|         } | ||||
| 
 | ||||
|         todo.remove(&event_id); | ||||
|     } | ||||
| 
 | ||||
|     for server in db | ||||
|         .rooms | ||||
|         .room_servers(&body.room_id) | ||||
|         .filter_map(|r| r.ok()) | ||||
|         .filter(|server| &**server != db.globals.server_name()) | ||||
|     { | ||||
|         db.sending.send_pdu(&server, &pdu_id)?; | ||||
|     } | ||||
| 
 | ||||
|     Ok(create_join_event::v2::Response { | ||||
|         room_state: RoomState { | ||||
|             auth_chain: auth_chain_ids | ||||
|                 .iter() | ||||
|                 .filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten()) | ||||
|                 .map(|json| PduEvent::convert_to_outgoing_federation_event(json)) | ||||
|                 .collect(), | ||||
|             state: state_ids | ||||
|                 .iter() | ||||
|                 .filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten()) | ||||
|                 .map(|json| PduEvent::convert_to_outgoing_federation_event(json)) | ||||
|                 .collect(), | ||||
|         }, | ||||
|     } | ||||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     put("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>") | ||||
|  | @ -1504,6 +1875,10 @@ pub async fn create_invite_route<'a>( | |||
|     db: State<'a, Database>, | ||||
|     body: Ruma<create_invite::v2::Request>, | ||||
| ) -> ConduitResult<create_invite::v2::Response> { | ||||
|     if !db.globals.allow_federation() { | ||||
|         return Err(Error::bad_config("Federation is disabled.")); | ||||
|     } | ||||
| 
 | ||||
|     if body.room_version < RoomVersionId::Version6 { | ||||
|         return Err(Error::BadRequest( | ||||
|             ErrorKind::IncompatibleRoomVersion { | ||||
|  | @ -1599,6 +1974,31 @@ pub async fn create_invite_route<'a>( | |||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     get("/_matrix/federation/v1/query/directory", data = "<body>") | ||||
| )] | ||||
| #[tracing::instrument(skip(db, body))] | ||||
| pub fn get_room_information_route<'a>( | ||||
|     db: State<'a, Database>, | ||||
|     body: Ruma<get_room_information::v1::Request<'_>>, | ||||
| ) -> ConduitResult<get_room_information::v1::Response> { | ||||
|     if !db.globals.allow_federation() { | ||||
|         return Err(Error::bad_config("Federation is disabled.")); | ||||
|     } | ||||
| 
 | ||||
|     let room_id = db | ||||
|         .rooms | ||||
|         .id_from_alias(&body.room_alias)? | ||||
|         .ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Room alias not found."))?; | ||||
| 
 | ||||
|     Ok(get_room_information::v1::Response { | ||||
|         room_id, | ||||
|         servers: vec![db.globals.server_name().to_owned()], | ||||
|     } | ||||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     get("/_matrix/federation/v1/query/profile", data = "<body>") | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue