Merge branch 'badeventfix' into 'master'

fix: handle bad events in db better

See merge request famedly/conduit!148
next
Timo Kösters 2021-08-08 19:48:08 +00:00
commit 1cbc61c558
2 changed files with 41 additions and 37 deletions

View File

@ -270,8 +270,8 @@ impl Database {
eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?, eventid_outlierpdu: builder.open_tree("eventid_outlierpdu")?,
referencedevents: builder.open_tree("referencedevents")?, referencedevents: builder.open_tree("referencedevents")?,
pdu_cache: Mutex::new(LruCache::new(0)), pdu_cache: Mutex::new(LruCache::new(1_000_000)),
auth_chain_cache: Mutex::new(LruCache::new(0)), auth_chain_cache: Mutex::new(LruCache::new(1_000_000)),
}, },
account_data: account_data::AccountData { account_data: account_data::AccountData {
roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?, roomuserdataid_accountdata: builder.open_tree("roomuserdataid_accountdata")?,

View File

@ -666,14 +666,12 @@ pub async fn send_transaction_message_route(
drop(mutex_lock); drop(mutex_lock);
let elapsed = start_time.elapsed(); let elapsed = start_time.elapsed();
if elapsed > Duration::from_secs(1) { warn!(
warn!( "Handling event {} took {}m{}s",
"Handling event {} took {}m{}s", event_id,
event_id, elapsed.as_secs() / 60,
elapsed.as_secs() / 60, elapsed.as_secs() % 60
elapsed.as_secs() % 60 );
);
}
} }
for pdu in &resolved_map { for pdu in &resolved_map {
@ -1271,7 +1269,6 @@ pub fn handle_incoming_pdu<'a>(
} else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) { } else if fork_states.iter().skip(1).all(|f| &fork_states[0] == f) {
// There was only one state, so it has to be the room's current state (because that is // There was only one state, so it has to be the room's current state (because that is
// always included) // always included)
warn!("Skipping stateres because there is no new state.");
fork_states[0] fork_states[0]
.iter() .iter()
.map(|(k, pdu)| (k.clone(), pdu.event_id.clone())) .map(|(k, pdu)| (k.clone(), pdu.event_id.clone()))
@ -1411,12 +1408,12 @@ pub(crate) fn fetch_and_handle_events<'a>(
// a. Look in the main timeline (pduid_pdu tree) // a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree // b. Look at outlier pdu tree
// (get_pdu checks both) // (get_pdu checks both)
let pdu = match db.rooms.get_pdu(&id)? { let pdu = match db.rooms.get_pdu(&id) {
Some(pdu) => { Ok(Some(pdu)) => {
trace!("Found {} in db", id); trace!("Found {} in db", id);
pdu pdu
} }
None => { Ok(None) => {
// c. Ask origin server over federation // c. Ask origin server over federation
debug!("Fetching {} over federation.", id); debug!("Fetching {} over federation.", id);
match db match db
@ -1431,7 +1428,11 @@ pub(crate) fn fetch_and_handle_events<'a>(
Ok(res) => { Ok(res) => {
debug!("Got {} over federation", id); debug!("Got {} over federation", id);
let (event_id, mut value) = let (event_id, mut value) =
crate::pdu::gen_event_id_canonical_json(&res.pdu)?; match crate::pdu::gen_event_id_canonical_json(&res.pdu) {
Ok(t) => t,
Err(_) => continue,
};
// This will also fetch the auth chain // This will also fetch the auth chain
match handle_incoming_pdu( match handle_incoming_pdu(
origin, origin,
@ -1474,6 +1475,10 @@ pub(crate) fn fetch_and_handle_events<'a>(
} }
} }
} }
Err(e) => {
debug!("Error loading {}: {}", id, e);
continue;
}
}; };
pdus.push(pdu); pdus.push(pdu);
} }
@ -1728,44 +1733,47 @@ fn get_auth_chain(starting_events: Vec<EventId>, db: &Database) -> Result<HashSe
let mut full_auth_chain = HashSet::new(); let mut full_auth_chain = HashSet::new();
let mut cache = db.rooms.auth_chain_cache(); let mut cache = db.rooms.auth_chain_cache();
if let Some(cached) = cache.get_mut(&starting_events) {
return Ok(cached.clone());
}
for event_id in &starting_events { for event_id in &starting_events {
if let Some(cached) = cache.get_mut(&[event_id.clone()][..]) { if let Some(cached) = cache.get_mut(&[event_id.clone()][..]) {
full_auth_chain.extend(cached.iter().cloned()); full_auth_chain.extend(cached.iter().cloned());
} else { } else {
drop(cache); drop(cache);
let auth_chain = get_auth_chain_recursive(&event_id, HashSet::new(), db)?; let mut auth_chain = HashSet::new();
get_auth_chain_recursive(&event_id, &mut auth_chain, db)?;
cache = db.rooms.auth_chain_cache(); cache = db.rooms.auth_chain_cache();
cache.insert(vec![event_id.clone()], auth_chain.clone()); cache.insert(vec![event_id.clone()], auth_chain.clone());
full_auth_chain.extend(auth_chain); full_auth_chain.extend(auth_chain);
}; };
} }
cache.insert(starting_events, full_auth_chain.clone());
Ok(full_auth_chain) Ok(full_auth_chain)
} }
fn get_auth_chain_recursive( fn get_auth_chain_recursive(
event_id: &EventId, event_id: &EventId,
mut found: HashSet<EventId>, found: &mut HashSet<EventId>,
db: &Database, db: &Database,
) -> Result<HashSet<EventId>> { ) -> Result<()> {
if let Some(pdu) = db.rooms.get_pdu(&event_id)? { let r = db.rooms.get_pdu(&event_id);
for auth_event in &pdu.auth_events { match r {
if !found.contains(auth_event) { Ok(Some(pdu)) => {
found.insert(auth_event.clone()); for auth_event in &pdu.auth_events {
found = get_auth_chain_recursive(&auth_event, found, db)?; if !found.contains(auth_event) {
found.insert(auth_event.clone());
get_auth_chain_recursive(&auth_event, found, db)?;
}
} }
} }
} else { Ok(None) => {
warn!("Could not find pdu mentioned in auth events."); warn!("Could not find pdu mentioned in auth events.");
}
Err(e) => {
warn!("Could not load event in auth chain: {}", e);
}
} }
Ok(found) Ok(())
} }
#[cfg_attr( #[cfg_attr(
@ -1860,12 +1868,8 @@ pub fn get_event_authorization_route(
Ok(get_event_authorization::v1::Response { Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids auth_chain: auth_chain_ids
.into_iter() .into_iter()
.map(|id| { .filter_map(|id| Some(db.rooms.get_pdu_json(&id).ok()??))
Ok::<_, Error>(PduEvent::convert_to_outgoing_federation_event( .map(|event| PduEvent::convert_to_outgoing_federation_event(event))
db.rooms.get_pdu_json(&id)?.unwrap(),
))
})
.filter_map(|r| r.ok())
.collect(), .collect(),
} }
.into()) .into())