diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 1159185..99c0b62 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -643,8 +643,6 @@ async fn join_room_by_id_helper( ) .expect("iterative auth check failed on resolved events"); - let mut state = HashMap::new(); - // filter the events that failed the auth check keeping the remaining events // sorted correctly for ev_id in sorted_event_ids @@ -660,24 +658,20 @@ async fn join_room_by_id_helper( let mut pdu_id = room_id.as_bytes().to_vec(); pdu_id.push(0xff); pdu_id.extend_from_slice(&count.to_be_bytes()); + + let hash = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; + db.rooms.append_pdu( &pdu, utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"), count, pdu_id.clone().into(), - // TODO: can we simplify the DAG or should we copy it exactly?? &pdu.prev_events, &db, )?; - if state_events.contains(ev_id) { - if let Some(key) = &pdu.state_key { - state.insert((pdu.kind(), key.to_string()), pdu_id); - } - } + db.rooms.set_room_state(room_id, &hash)?; } - - db.rooms.force_state(room_id, state, &db.globals)?; } else { let event = member::MemberEventContent { membership: member::MembershipState::Join, diff --git a/src/database.rs b/src/database.rs index 3fb8442..35b7bcd 100644 --- a/src/database.rs +++ b/src/database.rs @@ -161,6 +161,7 @@ impl Database { roomid_statehash: db.open_tree("roomid_statehash")?, roomeventid_outlierpdu: db.open_tree("roomeventid_outlierpdu")?, prevevent_parent: db.open_tree("prevevent_parent")?, + roomeventid_outlierpducount: db.open_tree("roomeventid_outlierpducount")?, }, account_data: account_data::AccountData { roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, diff --git a/src/database/rooms.rs b/src/database/rooms.rs index abe8c65..43d5f7d 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -69,6 +69,10 @@ pub struct Rooms { /// RoomId + EventId -> outlier PDU. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. pub(super) roomeventid_outlierpdu: sled::Tree, + /// RoomId + EventId -> count of the last known pdu when the outlier was inserted. + /// This allows us to skip any state snapshots that would for sure not have the outlier. + pub(super) roomeventid_outlierpducount: sled::Tree, + /// RoomId + EventId -> Parent PDU EventId. pub(super) prevevent_parent: sled::Tree, } @@ -323,6 +327,15 @@ impl Rooms { .map_or(Ok(None), |pdu_id| self.pdu_count(&pdu_id).map(Some)) } + pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result { + self.pduid_pdu + .scan_prefix(room_id.as_bytes()) + .last() + .map(|b| self.pdu_count(&b?.0)) + .transpose() + .map(|op| op.unwrap_or_default()) + } + /// Returns the json of a pdu. pub fn get_pdu_json(&self, event_id: &EventId) -> Result> { self.eventid_pduid @@ -490,6 +503,8 @@ impl Rooms { &key, &*serde_json::to_string(&pdu).expect("PduEvent is always a valid String"), )?; + self.roomeventid_outlierpducount + .insert(&key, &self.latest_pdu_count(pdu.room_id())?.to_be_bytes())?; Ok(()) } @@ -537,7 +552,45 @@ impl Rooms { let mut key = pdu.room_id().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(pdu.event_id().as_bytes()); - self.roomeventid_outlierpdu.remove(key)?; + if self.roomeventid_outlierpdu.remove(&key)?.is_some() { + if let Some(state_key) = pdu.state_key.as_deref() { + let mut statekey = pdu.kind().as_ref().as_bytes().to_vec(); + statekey.extend_from_slice(state_key.as_bytes()); + + let short = match self.statekey_short.get(&statekey)? { + Some(short) => utils::u64_from_bytes(&short).map_err(|_| { + Error::bad_database("Invalid short bytes in statekey_short.") + })?, + None => { + error!( + "This event has been inserted into the state snapshot tree previously." + ); + let short = db.globals.next_count()?; + self.statekey_short + .insert(&statekey, &short.to_be_bytes())?; + short + } + }; + + let mut start = pdu.room_id().as_bytes().to_vec(); + start.extend_from_slice( + &self + .roomeventid_outlierpducount + .get(&key)? + .unwrap_or_default(), + ); + for hash in self.pduid_statehash.range(start..).values() { + let mut hash = hash?.to_vec(); + hash.extend_from_slice(&short.to_be_bytes()); + + let _ = self.stateid_pduid.compare_and_swap( + hash, + Some(pdu.event_id().as_bytes()), + Some(pdu_id.as_ref()), + )?; + } + } + } // We must keep track of all events that have been referenced. for leaf in leaves { diff --git a/src/server_server.rs b/src/server_server.rs index 48d5956..780109c 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -971,6 +971,7 @@ fn validate_event<'a>( } } Err(_e) => { + error!("{}", _e); return Err("Signature verification failed".to_string()); } }; @@ -988,7 +989,7 @@ fn validate_event<'a>( fetch_check_auth_events(db, origin, pub_key_map, &pdu.auth_events, auth_cache) .await - .map_err(|_| "Event failed auth chain check".to_string())?; + .map_err(|e| e.to_string())?; let pdu = Arc::new(pdu.clone()); @@ -1064,6 +1065,7 @@ async fn fetch_check_auth_events( /// Find the event and auth it. Once the event is validated (steps 1 - 8) /// it is appended to the outliers Tree. /// +/// 0. Look in the auth_cache /// 1. Look in the main timeline (pduid_pdu tree) /// 2. Look at outlier pdu tree /// 3. Ask origin server over federation @@ -1080,28 +1082,35 @@ pub(crate) async fn fetch_events( ) -> Result>> { let mut pdus = vec![]; for id in events { - // `get_pdu` checks the outliers tree for us - let pdu = match db.rooms.get_pdu(&id)? { - Some(pdu) => Arc::new(pdu), - None => match db - .sending - .send_federation_request( - &db.globals, - origin, - get_event::v1::Request { event_id: &id }, - ) - .await - { - Ok(res) => { - let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu); - let (pdu, _) = validate_event(db, value, event_id, key_map, origin, auth_cache) - .await - .map_err(|_| Error::Conflict("Authentication of event failed"))?; + let pdu = match auth_cache.get(id) { + Some(pdu) => pdu.clone(), + // `get_pdu` checks the outliers tree for us + None => match db.rooms.get_pdu(&id)? { + Some(pdu) => Arc::new(pdu), + None => match db + .sending + .send_federation_request( + &db.globals, + origin, + get_event::v1::Request { event_id: &id }, + ) + .await + { + Ok(res) => { + let (event_id, value) = crate::pdu::gen_event_id_canonical_json(&res.pdu); + let (pdu, _) = + validate_event(db, value, event_id, key_map, origin, auth_cache) + .await + .map_err(|e| { + error!("{:?}", e); + Error::Conflict("Authentication of event failed") + })?; - db.rooms.append_pdu_outlier(&pdu)?; - pdu - } - Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), + db.rooms.append_pdu_outlier(&pdu)?; + pdu + } + Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")), + }, }, }; auth_cache.entry(id.clone()).or_insert_with(|| pdu.clone());