Fiter PDU's before main incoming PDU loop

next
Devin Ragotzy 2021-02-01 17:02:56 -05:00
parent 64374b4679
commit 591769d5f3
2 changed files with 66 additions and 48 deletions

View File

@ -466,16 +466,7 @@ impl Rooms {
/// Returns true if the event_id was previously inserted. /// Returns true if the event_id was previously inserted.
pub fn append_pdu_outlier(&self, pdu: &PduEvent) -> Result<bool> { pub fn append_pdu_outlier(&self, pdu: &PduEvent) -> Result<bool> {
log::info!( log::info!("Number of outlier pdu's {}", self.eventid_outlierpdu.len());
"Number of outlier pdu's {:#?}",
self.eventid_outlierpdu
.iter()
.map(|pair| {
let (_k, v) = pair.unwrap();
serde_json::from_slice::<PduBuilder>(&v).unwrap()
})
.collect::<Vec<_>>()
);
let mut key = pdu.room_id().as_bytes().to_vec(); let mut key = pdu.room_id().as_bytes().to_vec();
key.push(0xff); key.push(0xff);

View File

@ -533,6 +533,54 @@ pub async fn send_transaction_message_route<'a>(
} }
} }
let mut resolved_map = BTreeMap::new();
let pdus_to_resolve = body
.pdus
.iter()
.filter_map(|pdu| {
// 1. Is a valid event, otherwise it is dropped.
// Ruma/PduEvent/StateEvent satisfies this
// We do not add the event_id field to the pdu here because of signature and hashes checks
let (event_id, value) = crate::pdu::gen_event_id_canonical_json(pdu);
// If we have no idea about this room skip the PDU
let room_id = match value
.get("room_id")
.map(|id| match id {
CanonicalJsonValue::String(id) => RoomId::try_from(id.as_str()).ok(),
_ => None,
})
.flatten()
{
Some(id) => id,
None => {
resolved_map.insert(event_id, Err("Event needs a valid RoomId".to_string()));
return None;
}
};
// 1. check the server is in the room (optional)
match db.rooms.exists(&room_id) {
Ok(true) => {}
_ => {
resolved_map
.insert(event_id, Err("Room is unknown to this server".to_string()));
return None;
}
}
// If we know of this pdu we don't need to continue processing it
//
// This check is essentially
if let Ok(Some(_)) = db.rooms.get_pdu_id(&event_id) {
return None;
}
Some((event_id, value))
})
.collect::<Vec<_>>();
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere? // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?
// SPEC: // SPEC:
// Servers MUST strictly enforce the JSON format specified in the appendices. // Servers MUST strictly enforce the JSON format specified in the appendices.
@ -540,35 +588,7 @@ pub async fn send_transaction_message_route<'a>(
// events over federation. For example, the Federation API's /send endpoint would // events over federation. For example, the Federation API's /send endpoint would
// discard the event whereas the Client Server API's /send/{eventType} endpoint // discard the event whereas the Client Server API's /send/{eventType} endpoint
// would return a M_BAD_JSON error. // would return a M_BAD_JSON error.
let mut resolved_map = BTreeMap::new(); 'main_pdu_loop: for (event_id, value) in pdus_to_resolve {
'main_pdu_loop: for pdu in &body.pdus {
// 1. Is a valid event, otherwise it is dropped.
// Ruma/PduEvent/StateEvent satisfies this
// We do not add the event_id field to the pdu here because of signature and hashes checks
let (event_id, value) = crate::pdu::gen_event_id_canonical_json(pdu);
// If we have no idea about this room skip the PDU
let room_id = match value
.get("room_id")
.map(|id| match id {
CanonicalJsonValue::String(id) => RoomId::try_from(id.as_str()).ok(),
_ => None,
})
.flatten()
{
Some(id) => id,
None => {
resolved_map.insert(event_id, Err("Event needs a valid RoomId".to_string()));
continue;
}
};
// 1. check the server is in the room (optional)
if !db.rooms.exists(&room_id)? {
resolved_map.insert(event_id, Err("Room is unknown to this server".to_string()));
continue;
}
let server_name = &body.body.origin; let server_name = &body.body.origin;
let mut pub_key_map = BTreeMap::new(); let mut pub_key_map = BTreeMap::new();
@ -921,13 +941,13 @@ pub async fn send_transaction_message_route<'a>(
)?; )?;
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
resolved_map.insert(pdu.event_id().clone(), Ok(()));
} }
Ok(send_transaction_message::v1::Response { if !resolved_map.is_empty() {
pdus: dbg!(resolved_map), warn!("These PDU's failed {:?}", resolved_map);
} }
.into())
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
} }
/// An async function that can recursively calls itself. /// An async function that can recursively calls itself.
@ -1139,6 +1159,7 @@ pub(crate) async fn calculate_forward_extremities(
// Make sure the incoming event is not already a forward extremity // Make sure the incoming event is not already a forward extremity
// FIXME: I think this could happen if different servers send us the same event?? // FIXME: I think this could happen if different servers send us the same event??
if current_leaves.contains(pdu.event_id()) { if current_leaves.contains(pdu.event_id()) {
error!("The incoming event is already present in get_pdu_leaves BUG");
is_incoming_leaf = false; is_incoming_leaf = false;
// Not sure what to do here // Not sure what to do here
} }
@ -1147,11 +1168,12 @@ pub(crate) async fn calculate_forward_extremities(
// then do nothing - it's not a candidate to be a new extremity if // then do nothing - it's not a candidate to be a new extremity if
// it has been referenced. // it has been referenced.
// //
// We first check if know of the event and then don't include it as a forward // We check this in the filter just before the main incoming PDU for loop
// extremity if it is a timeline event // so no already known event can make it this far.
if db.rooms.get_pdu_id(pdu.event_id())?.is_some() { //
is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some(); // if db.rooms.get_pdu_id(pdu.event_id())?.is_some() {
} // is_incoming_leaf = db.rooms.get_pdu_outlier(pdu.event_id())?.is_some();
// }
// TODO: // TODO:
// [dendrite] Checks if any other leaves have been referenced and removes them // [dendrite] Checks if any other leaves have been referenced and removes them
@ -1219,7 +1241,12 @@ pub(crate) async fn build_forward_extremity_snapshots(
} }
fork_states.insert(state_before); fork_states.insert(state_before);
} else if id == pdu.event_id() {
// We add this snapshot after `build_forward_extremity_snapshots` is
// called which we requested from the sending server
} else { } else {
error!("Missing state snapshot for {:?} - {:?}", id, pdu.kind());
let res = db let res = db
.sending .sending
.send_federation_request( .send_federation_request(