diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index ab646a1..dcd7c37 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -564,7 +564,7 @@ async fn join_room_by_id_helper( pdu_id.extend_from_slice(&count.to_be_bytes()); let pdu = PduEvent::from_id_val(&event_id, join_event.clone()) - .map_err(|_| Error::BadServerResponse("Invalid PDU in send_join response."))?; + .map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?; let mut state = BTreeMap::new(); let pub_key_map = RwLock::new(BTreeMap::new()); @@ -588,7 +588,7 @@ async fn join_room_by_id_helper( Error::BadServerResponse("Invalid PDU in send_join response.") })?; - db.rooms.add_pdu_outlier(&pdu)?; + db.rooms.add_pdu_outlier(&event_id, &value)?; if let Some(state_key) = &pdu.state_key { if pdu.kind == EventType::RoomMember { let target_user_id = UserId::try_from(state_key.clone()).map_err(|e| { @@ -632,7 +632,11 @@ async fn join_room_by_id_helper( pdu.event_id.clone(), ); - db.rooms.force_state(room_id, state, &db.globals)?; + if state.get(&(EventType::RoomCreate, "".to_owned())).is_none() { + return Err(Error::BadServerResponse("State contained no create event.")); + } + + db.rooms.force_state(room_id, state, &db)?; for result in futures::future::join_all( send_join_response @@ -648,11 +652,7 @@ async fn join_room_by_id_helper( Err(_) => continue, }; - let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { - warn!("{:?}: {}", value, e); - Error::BadServerResponse("Invalid PDU in send_join response.") - })?; - db.rooms.add_pdu_outlier(&pdu)?; + db.rooms.add_pdu_outlier(&event_id, &value)?; } // We append to state before appending the pdu, so we don't have a moment in time with the diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 35c1df7..b714582 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -241,7 +241,7 @@ impl Rooms { kind: &EventType, sender: &UserId, state_key: Option<&str>, - content: serde_json::Value, + content: &serde_json::Value, ) -> Result>> { let auth_events = state_res::auth_types_for_event( kind, @@ -295,7 +295,7 @@ impl Rooms { &self, room_id: &RoomId, state: BTreeMap<(EventType, String), EventId>, - globals: &super::globals::Globals, + db: &Database, ) -> Result<()> { let state_hash = self.calculate_hash( &state @@ -304,57 +304,109 @@ impl Rooms { .collect::>(), ); - let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { - Some(shortstatehash) => { - // State already existed in db - self.roomid_shortstatehash - .insert(room_id.as_bytes(), &*shortstatehash)?; - return Ok(()); - } - None => { - let shortstatehash = globals.next_count()?; - self.statehash_shortstatehash - .insert(&state_hash, &shortstatehash.to_be_bytes())?; - shortstatehash.to_be_bytes().to_vec() + let (shortstatehash, already_existed) = + match self.statehash_shortstatehash.get(&state_hash)? { + Some(shortstatehash) => ( + utils::u64_from_bytes(&shortstatehash) + .map_err(|_| Error::bad_database("Invalid shortstatehash in db."))?, + true, + ), + None => { + let shortstatehash = db.globals.next_count()?; + self.statehash_shortstatehash + .insert(&state_hash, &shortstatehash.to_be_bytes())?; + (shortstatehash, false) + } + }; + + let new_state = if !already_existed { + let mut new_state = HashSet::new(); + + for ((event_type, state_key), eventid) in state { + new_state.insert(eventid.clone()); + + let mut statekey = event_type.as_ref().as_bytes().to_vec(); + statekey.push(0xff); + statekey.extend_from_slice(&state_key.as_bytes()); + + let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { + Some(shortstatekey) => shortstatekey.to_vec(), + None => { + let shortstatekey = db.globals.next_count()?; + self.statekey_shortstatekey + .insert(&statekey, &shortstatekey.to_be_bytes())?; + shortstatekey.to_be_bytes().to_vec() + } + }; + + let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? { + Some(shorteventid) => shorteventid.to_vec(), + None => { + let shorteventid = db.globals.next_count()?; + self.eventid_shorteventid + .insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?; + self.shorteventid_eventid + .insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?; + shorteventid.to_be_bytes().to_vec() + } + }; + + let mut state_id = shortstatehash.to_be_bytes().to_vec(); + state_id.extend_from_slice(&shortstatekey); + + self.stateid_shorteventid + .insert(&state_id, &*shorteventid)?; } + + new_state + } else { + self.state_full_ids(shortstatehash)?.into_iter().collect() }; - for ((event_type, state_key), eventid) in state { - let mut statekey = event_type.as_ref().as_bytes().to_vec(); - statekey.push(0xff); - statekey.extend_from_slice(&state_key.as_bytes()); + let old_state = self + .current_shortstatehash(&room_id)? + .map(|s| self.state_full_ids(s)) + .transpose()? + .map(|vec| vec.into_iter().collect::>()) + .unwrap_or_default(); - let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { - Some(shortstatekey) => shortstatekey.to_vec(), - None => { - let shortstatekey = globals.next_count()?; - self.statekey_shortstatekey - .insert(&statekey, &shortstatekey.to_be_bytes())?; - shortstatekey.to_be_bytes().to_vec() + for event_id in new_state.difference(&old_state) { + if let Some(pdu) = self.get_pdu_json(event_id)? { + if pdu.get("event_type") + == Some(&CanonicalJsonValue::String("m.room.member".to_owned())) + { + if let Ok(pdu) = serde_json::from_value::( + serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"), + ) { + if let Some(membership) = + pdu.content.get("membership").and_then(|membership| { + serde_json::from_value::( + membership.clone(), + ) + .ok() + }) + { + if let Some(state_key) = pdu + .state_key + .and_then(|state_key| UserId::try_from(state_key).ok()) + { + self.update_membership( + room_id, + &state_key, + membership, + &pdu.sender, + None, + db, + )?; + } + } + } } - }; - - let shorteventid = match self.eventid_shorteventid.get(eventid.as_bytes())? { - Some(shorteventid) => shorteventid.to_vec(), - None => { - let shorteventid = globals.next_count()?; - self.eventid_shorteventid - .insert(eventid.as_bytes(), &shorteventid.to_be_bytes())?; - self.shorteventid_eventid - .insert(&shorteventid.to_be_bytes(), eventid.as_bytes())?; - shorteventid.to_be_bytes().to_vec() - } - }; - - let mut state_id = shortstatehash.clone(); - state_id.extend_from_slice(&shortstatekey); - - self.stateid_shorteventid - .insert(&*state_id, &*shorteventid)?; + } } self.roomid_shortstatehash - .insert(room_id.as_bytes(), &*shortstatehash)?; + .insert(room_id.as_bytes(), &shortstatehash.to_be_bytes())?; Ok(()) } @@ -591,10 +643,10 @@ impl Rooms { /// Append the PDU as an outlier. /// /// Any event given to this will be processed (state-res) on another thread. - pub fn add_pdu_outlier(&self, pdu: &PduEvent) -> Result<()> { + pub fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()> { self.eventid_outlierpdu.insert( - &pdu.event_id.as_bytes(), - &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), + &event_id.as_bytes(), + &*serde_json::to_string(&pdu).expect("CanonicalJsonObject is valid string"), )?; Ok(()) @@ -1193,7 +1245,7 @@ impl Rooms { &event_type, &sender, state_key.as_deref(), - content.clone(), + &content, )?; // Our depth is the maximum depth of prev_events + 1 diff --git a/src/main.rs b/src/main.rs index 31cfaca..3157023 100644 --- a/src/main.rs +++ b/src/main.rs @@ -168,7 +168,10 @@ fn setup_rocket() -> (rocket::Rocket, Config) { server_server::get_event_route, server_server::get_missing_events_route, server_server::get_room_state_ids_route, + server_server::create_join_event_template_route, + server_server::create_join_event_route, server_server::create_invite_route, + server_server::get_room_information_route, server_server::get_profile_information_route, ], ) diff --git a/src/server_server.rs b/src/server_server.rs index 5b49472..ed4be0c 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -14,25 +14,32 @@ use ruma::{ VerifyKey, }, event::{get_event, get_missing_events, get_room_state_ids}, - membership::create_invite, - query::get_profile_information, + membership::{ + create_invite, + create_join_event::{self, RoomState}, + create_join_event_template, + }, + query::{get_profile_information, get_room_information}, transactions::send_transaction_message, }, IncomingResponse, OutgoingRequest, OutgoingResponse, }, directory::{IncomingFilter, IncomingRoomNetwork}, events::{ - room::{create::CreateEventContent, member::MembershipState}, + room::{ + create::CreateEventContent, + member::{MemberEventContent, MembershipState}, + }, EventType, }, serde::{to_canonical_value, Raw}, - signatures::CanonicalJsonValue, - EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, + signatures::{CanonicalJsonObject, CanonicalJsonValue}, + uint, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, }; use state_res::{Event, EventMap, StateMap}; use std::{ collections::{btree_map::Entry, BTreeMap, BTreeSet, HashSet}, - convert::TryFrom, + convert::{TryFrom, TryInto}, fmt::Debug, future::Future, net::{IpAddr, SocketAddr}, @@ -589,8 +596,8 @@ pub async fn send_transaction_message_route<'a>( Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) } -/// An async function that can recursively calls itself. -type AsyncRecursiveResult<'a, T> = Pin> + 'a + Send>>; +/// An async function that can recursively call itself. +type AsyncRecursiveResult<'a, T, E> = Pin> + 'a + Send>>; /// When receiving an event one needs to: /// 0. Skip the PDU if we already know about it @@ -624,13 +631,13 @@ fn handle_incoming_pdu<'a>( db: &'a Database, pub_key_map: &'a RwLock>>, auth_cache: &'a mut EventMap>, -) -> AsyncRecursiveResult<'a, Arc> { +) -> AsyncRecursiveResult<'a, Option>, String> { Box::pin(async move { // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json - // 0. Skip the PDU if we already know about it - if let Ok(Some(pdu)) = db.rooms.get_non_outlier_pdu(&event_id) { - return Ok(Arc::new(pdu)); + // 0. Skip the PDU if we already have it as a timeline event + if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(&event_id) { + return Ok(Some(pdu_id.to_vec())); } // 1. Check the server is in the room @@ -690,6 +697,7 @@ fn handle_incoming_pdu<'a>( } Ok(ruma::signatures::Verified::Signatures) => { // Redact + warn!("Calculated hash does not match: {}", event_id); match ruma::signatures::redact(&value, &room_version) { Ok(obj) => obj, Err(_) => return Err("Redaction failed".to_string()), @@ -705,7 +713,7 @@ fn handle_incoming_pdu<'a>( to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"), ); let incoming_pdu = serde_json::from_value::( - serde_json::to_value(val).expect("CanonicalJsonObj is a valid JsonValue"), + serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"), ) .map_err(|_| "Event is not a valid PDU.".to_string())?; @@ -793,13 +801,13 @@ fn handle_incoming_pdu<'a>( // 7. Persist the event as an outlier. db.rooms - .add_pdu_outlier(&incoming_pdu) + .add_pdu_outlier(&incoming_pdu.event_id, &val) .map_err(|_| "Failed to add pdu as outlier.".to_owned())?; debug!("Added pdu as outlier."); // 8. if not timeline event: stop if !is_timeline_event { - return Ok(incoming_pdu); + return Ok(None); } // TODO: 9. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events @@ -811,7 +819,54 @@ fn handle_incoming_pdu<'a>( // the state from a known point and resolve if > 1 prev_event debug!("Requesting state at event."); - let (state_at_incoming_event, incoming_auth_events): (StateMap>, Vec>) = + let mut state_at_incoming_event = None; + let mut incoming_auth_events = Vec::new(); + + if incoming_pdu.prev_events.len() == 1 { + let prev_event = &incoming_pdu.prev_events[0]; + let state_vec = db + .rooms + .pdu_shortstatehash(prev_event) + .map_err(|_| "Failed talking to db".to_owned())? + .map(|shortstatehash| db.rooms.state_full_ids(shortstatehash).ok()) + .flatten(); + if let Some(mut state_vec) = state_vec { + if db + .rooms + .get_pdu(prev_event) + .ok() + .flatten() + .ok_or_else(|| "Could not find prev event, but we know the state.".to_owned())? + .state_key + .is_some() + { + state_vec.push(prev_event.clone()); + } + state_at_incoming_event = Some( + fetch_and_handle_events(db, origin, &state_vec, pub_key_map, auth_cache) + .await + .map_err(|_| "Failed to fetch state events locally".to_owned())? + .into_iter() + .map(|pdu| { + ( + ( + pdu.kind.clone(), + pdu.state_key + .clone() + .expect("events from state_full_ids are state events"), + ), + pdu, + ) + }) + .collect(), + ); + } + &state_at_incoming_event; + + // TODO: set incoming_auth_events? + } + + if state_at_incoming_event.is_none() { // Call /state_ids to find out what the state at this pdu is. We trust the server's // response to some extend, but we still do a lot of checks on the events match db @@ -856,12 +911,16 @@ fn handle_incoming_pdu<'a>( } // The original create event must still be in the state - if state.get(&(EventType::RoomCreate, "".to_owned())).map(|a| a.as_ref()) != Some(&create_event) { + if state + .get(&(EventType::RoomCreate, "".to_owned())) + .map(|a| a.as_ref()) + != Some(&create_event) + { return Err("Incoming event refers to wrong create event.".to_owned()); } debug!("Fetching auth chain events at event."); - let incoming_auth_events = match fetch_and_handle_events( + incoming_auth_events = match fetch_and_handle_events( &db, origin, &res.auth_chain_ids, @@ -874,12 +933,16 @@ fn handle_incoming_pdu<'a>( Err(_) => return Err("Failed to fetch auth chain.".to_owned()), }; - (state, incoming_auth_events) + state_at_incoming_event = Some(state); } Err(_) => { return Err("Fetching state for event failed".into()); } }; + } + + let state_at_incoming_event = + state_at_incoming_event.expect("we always set this to some above"); // 11. Check the auth of the event passes based on the state of the event if !state_res::event_auth::auth_check( @@ -1079,20 +1142,26 @@ fn handle_incoming_pdu<'a>( // Now that the event has passed all auth it is added into the timeline. // We use the `state_at_event` instead of `state_after` so we accurately // represent the state for this event. - append_incoming_pdu(&db, &incoming_pdu, extremities, &state_at_incoming_event) - .map_err(|_| "Failed to add pdu to db.".to_owned())?; + let pdu_id = append_incoming_pdu( + &db, + &incoming_pdu, + val, + extremities, + &state_at_incoming_event, + ) + .map_err(|_| "Failed to add pdu to db.".to_owned())?; debug!("Appended incoming pdu."); // Set the new room state to the resolved state if update_state { db.rooms - .force_state(&room_id, new_room_state, &db.globals) + .force_state(&room_id, new_room_state, &db) .map_err(|_| "Failed to set new room state.".to_owned())?; } debug!("Updated resolved state"); // Event has passed all auth/stateres checks - Ok(incoming_pdu) + Ok(Some(pdu_id)) }) } @@ -1108,77 +1177,93 @@ fn handle_incoming_pdu<'a>( /// If the event is unknown to the `auth_cache` it is added. This guarantees that any /// event we need to know of will be present. //#[tracing::instrument(skip(db, key_map, auth_cache))] -pub(crate) async fn fetch_and_handle_events( - db: &Database, - origin: &ServerName, - events: &[EventId], - pub_key_map: &RwLock>>, - auth_cache: &mut EventMap>, -) -> Result>> { - let mut pdus = vec![]; - for id in events { - // a. Look at auth cache - let pdu = match auth_cache.get(id) { - Some(pdu) => { - debug!("Found {} in cache", id); - pdu.clone() - } - // b. Look in the main timeline (pduid_pdu tree) - // c. Look at outlier pdu tree - // (get_pdu checks both) - None => match db.rooms.get_pdu(&id)? { - Some(pdu) => { - debug!("Found {} in outliers", id); - Arc::new(pdu) - } - None => { - // d. Ask origin server over federation - debug!("Fetching {} over federation.", id); - match db - .sending - .send_federation_request( - &db.globals, - origin, - get_event::v1::Request { event_id: &id }, - ) - .await - { - Ok(res) => { - debug!("Got {} over federation: {:?}", id, res); - let (event_id, value) = - crate::pdu::gen_event_id_canonical_json(&res.pdu)?; - let pdu = match handle_incoming_pdu( - origin, - &event_id, - value, - false, +pub(crate) fn fetch_and_handle_events<'a>( + db: &'a Database, + origin: &'a ServerName, + events: &'a [EventId], + pub_key_map: &'a RwLock>>, + auth_cache: &'a mut EventMap>, +) -> AsyncRecursiveResult<'a, Vec>, Error> { + Box::pin(async move { + let mut pdus = vec![]; + for id in events { + // a. Look at auth cache + let pdu = + match auth_cache.get(id) { + Some(pdu) => { + debug!("Found {} in cache", id); + // We already have the auth chain for events in cache + pdu.clone() + } + // b. Look in the main timeline (pduid_pdu tree) + // c. Look at outlier pdu tree + // (get_pdu checks both) + None => match db.rooms.get_pdu(&id)? { + Some(pdu) => { + debug!("Found {} in db", id); + // We need to fetch the auth chain + let _ = fetch_and_handle_events( db, + origin, + &pdu.auth_events, pub_key_map, auth_cache, ) - .await + .await?; + Arc::new(pdu) + } + None => { + // d. Ask origin server over federation + debug!("Fetching {} over federation.", id); + match db + .sending + .send_federation_request( + &db.globals, + origin, + get_event::v1::Request { event_id: &id }, + ) + .await { - Ok(pdu) => pdu, - Err(e) => { - warn!("Authentication of event {} failed: {:?}", id, e); + Ok(res) => { + debug!("Got {} over federation: {:?}", id, res); + let (event_id, value) = + crate::pdu::gen_event_id_canonical_json(&res.pdu)?; + // This will also fetch the auth chain + match handle_incoming_pdu( + origin, + &event_id, + value.clone(), + false, + db, + pub_key_map, + auth_cache, + ) + .await + { + Ok(_) => Arc::new(serde_json::from_value( + serde_json::to_value(value) + .expect("canonicaljsonobject is valid value"), + ) + .expect("This is possible because handle_incoming_pdu worked")), + Err(e) => { + warn!("Authentication of event {} failed: {:?}", id, e); + continue; + } + } + } + Err(_) => { + warn!("Failed to fetch event: {}", id); continue; } - }; - - pdu + } } - Err(_) => { - warn!("Failed to fetch event: {}", id); - continue; - } - } - } - }, - }; - auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone()); - pdus.push(pdu); - } - Ok(pdus) + }, + }; + auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone()); + pdus.push(pdu); + } + Ok(pdus) + }) } /// Search the DB for the signing keys of the given server, if we don't have them @@ -1280,9 +1365,10 @@ pub(crate) async fn fetch_signing_keys( pub(crate) fn append_incoming_pdu( db: &Database, pdu: &PduEvent, + pdu_json: CanonicalJsonObject, new_room_leaves: HashSet, state: &StateMap>, -) -> Result<()> { +) -> Result> { let count = db.globals.next_count()?; let mut pdu_id = pdu.room_id.as_bytes().to_vec(); pdu_id.push(0xff); @@ -1295,7 +1381,7 @@ pub(crate) fn append_incoming_pdu( db.rooms.append_pdu( pdu, - utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), + pdu_json, count, pdu_id.clone().into(), &new_room_leaves.into_iter().collect::>(), @@ -1366,7 +1452,7 @@ pub(crate) fn append_incoming_pdu( } } - Ok(()) + Ok(pdu_id) } #[cfg_attr( @@ -1495,6 +1581,291 @@ pub fn get_room_state_ids_route<'a>( .into()) } +#[cfg_attr( + feature = "conduit_bin", + get("/_matrix/federation/v1/make_join/<_>/<_>", data = "") +)] +#[tracing::instrument(skip(db, body))] +pub fn create_join_event_template_route<'a>( + db: State<'a, Database>, + body: Ruma>, +) -> ConduitResult { + if !db.globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + if !db.rooms.exists(&body.room_id)? { + return Err(Error::BadRequest( + ErrorKind::NotFound, + "Server is not in room.", + )); + } + + if !body.ver.contains(&RoomVersionId::Version6) { + return Err(Error::BadRequest( + ErrorKind::IncompatibleRoomVersion { + room_version: RoomVersionId::Version6, + }, + "Room version not supported.", + )); + } + + let prev_events = db + .rooms + .get_pdu_leaves(&body.room_id)? + .into_iter() + .take(20) + .collect::>(); + + let create_event = db + .rooms + .room_state_get(&body.room_id, &EventType::RoomCreate, "")?; + + let create_event_content = create_event + .as_ref() + .map(|create_event| { + Ok::<_, Error>( + serde_json::from_value::>(create_event.content.clone()) + .expect("Raw::from_value always works.") + .deserialize() + .map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?, + ) + }) + .transpose()?; + + let create_prev_event = if prev_events.len() == 1 + && Some(&prev_events[0]) == create_event.as_ref().map(|c| &c.event_id) + { + create_event.map(Arc::new) + } else { + None + }; + + // If there was no create event yet, assume we are creating a version 6 room right now + let room_version = create_event_content.map_or(RoomVersionId::Version6, |create_event| { + create_event.room_version + }); + + let content = serde_json::to_value(MemberEventContent { + avatar_url: None, + displayname: None, + is_direct: None, + membership: MembershipState::Join, + third_party_invite: None, + }) + .expect("member event is valid value"); + + let state_key = body.user_id.to_string(); + let kind = EventType::RoomMember; + + let auth_events = db.rooms.get_auth_events( + &body.room_id, + &kind, + &body.user_id, + Some(&state_key), + &content, + )?; + + // Our depth is the maximum depth of prev_events + 1 + let depth = prev_events + .iter() + .filter_map(|event_id| Some(db.rooms.get_pdu(event_id).ok()??.depth)) + .max() + .unwrap_or_else(|| uint!(0)) + + uint!(1); + + let mut unsigned = BTreeMap::new(); + + if let Some(prev_pdu) = db.rooms.room_state_get(&body.room_id, &kind, &state_key)? { + unsigned.insert("prev_content".to_owned(), prev_pdu.content); + unsigned.insert( + "prev_sender".to_owned(), + serde_json::to_value(prev_pdu.sender).expect("UserId::to_value always works"), + ); + } + + let pdu = PduEvent { + event_id: ruma::event_id!("$thiswillbefilledinlater"), + room_id: body.room_id.clone(), + sender: body.user_id.clone(), + origin_server_ts: utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"), + kind, + content, + state_key: Some(state_key), + prev_events, + depth, + auth_events: auth_events + .iter() + .map(|(_, pdu)| pdu.event_id.clone()) + .collect(), + redacts: None, + unsigned, + hashes: ruma::events::pdu::EventHash { + sha256: "aaa".to_owned(), + }, + signatures: BTreeMap::new(), + }; + + let auth_check = state_res::auth_check( + &room_version, + &Arc::new(pdu.clone()), + create_prev_event, + &auth_events, + None, // TODO: third_party_invite + ) + .map_err(|e| { + error!("{:?}", e); + Error::bad_database("Auth check failed.") + })?; + + if !auth_check { + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Event is not authorized.", + )); + } + + // Hash and sign + let mut pdu_json = + utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); + + pdu_json.remove("event_id"); + + // Add origin because synapse likes that (and it's required in the spec) + pdu_json.insert( + "origin".to_owned(), + to_canonical_value(db.globals.server_name()) + .expect("server name is a valid CanonicalJsonValue"), + ); + + Ok(create_join_event_template::v1::Response { + room_version: Some(RoomVersionId::Version6), + event: serde_json::from_value::>( + serde_json::to_value(pdu_json).expect("CanonicalJson is valid serde_json::Value"), + ) + .expect("Raw::from_value always works"), + } + .into()) +} + +#[cfg_attr( + feature = "conduit_bin", + put("/_matrix/federation/v2/send_join/<_>/<_>", data = "") +)] +#[tracing::instrument(skip(db, body))] +pub async fn create_join_event_route<'a>( + db: State<'a, Database>, + body: Ruma>, +) -> ConduitResult { + if !db.globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + // We need to return the state prior to joining, let's keep a reference to that here + let shortstatehash = + db.rooms + .current_shortstatehash(&body.room_id)? + .ok_or(Error::BadRequest( + ErrorKind::NotFound, + "Pdu state not found.", + ))?; + + let pub_key_map = RwLock::new(BTreeMap::new()); + let mut auth_cache = EventMap::new(); + + // We do not add the event_id field to the pdu here because of signature and hashes checks + let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&body.pdu) { + Ok(t) => t, + Err(_) => { + // Event could not be converted to canonical json + return Err(Error::BadRequest( + ErrorKind::InvalidParam, + "Could not convert event to canonical json.", + )); + } + }; + + let origin = serde_json::from_value::>( + serde_json::to_value(value.get("origin").ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Event needs an origin field.", + ))?) + .expect("CanonicalJson is valid json value"), + ) + .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; + + let pdu_id = handle_incoming_pdu( + &origin, + &event_id, + value, + true, + &db, + &pub_key_map, + &mut auth_cache, + ) + .await + .map_err(|_| { + Error::BadRequest( + ErrorKind::InvalidParam, + "Error while handling incoming PDU.", + ) + })? + .ok_or(Error::BadRequest( + ErrorKind::InvalidParam, + "Could not accept incoming PDU as timeline event.", + ))?; + + let state_ids = db.rooms.state_full_ids(shortstatehash)?; + + let mut auth_chain_ids = BTreeSet::::new(); + let mut todo = state_ids.iter().cloned().collect::>(); + + while let Some(event_id) = todo.iter().next().cloned() { + if let Some(pdu) = db.rooms.get_pdu(&event_id)? { + todo.extend( + pdu.auth_events + .clone() + .into_iter() + .collect::>() + .difference(&auth_chain_ids) + .cloned(), + ); + auth_chain_ids.extend(pdu.auth_events.into_iter()); + } else { + warn!("Could not find pdu mentioned in auth events."); + } + + todo.remove(&event_id); + } + + for server in db + .rooms + .room_servers(&body.room_id) + .filter_map(|r| r.ok()) + .filter(|server| &**server != db.globals.server_name()) + { + db.sending.send_pdu(&server, &pdu_id)?; + } + + Ok(create_join_event::v2::Response { + room_state: RoomState { + auth_chain: auth_chain_ids + .iter() + .filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten()) + .map(|json| PduEvent::convert_to_outgoing_federation_event(json)) + .collect(), + state: state_ids + .iter() + .filter_map(|id| db.rooms.get_pdu_json(&id).ok().flatten()) + .map(|json| PduEvent::convert_to_outgoing_federation_event(json)) + .collect(), + }, + } + .into()) +} + #[cfg_attr( feature = "conduit_bin", put("/_matrix/federation/v2/invite/<_>/<_>", data = "") @@ -1504,6 +1875,10 @@ pub async fn create_invite_route<'a>( db: State<'a, Database>, body: Ruma, ) -> ConduitResult { + if !db.globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + if body.room_version < RoomVersionId::Version6 { return Err(Error::BadRequest( ErrorKind::IncompatibleRoomVersion { @@ -1599,6 +1974,31 @@ pub async fn create_invite_route<'a>( .into()) } +#[cfg_attr( + feature = "conduit_bin", + get("/_matrix/federation/v1/query/directory", data = "") +)] +#[tracing::instrument(skip(db, body))] +pub fn get_room_information_route<'a>( + db: State<'a, Database>, + body: Ruma>, +) -> ConduitResult { + if !db.globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let room_id = db + .rooms + .id_from_alias(&body.room_alias)? + .ok_or_else(|| Error::BadRequest(ErrorKind::NotFound, "Room alias not found."))?; + + Ok(get_room_information::v1::Response { + room_id, + servers: vec![db.globals.server_name().to_owned()], + } + .into()) +} + #[cfg_attr( feature = "conduit_bin", get("/_matrix/federation/v1/query/profile", data = "")