Use the auth_events for step 6, WIP forward_extremity_ids fn
parent
5a8041969d
commit
96dc6be14b
|
@ -569,7 +569,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
// discard the event whereas the Client Server API's /send/{eventType} endpoint
|
||||
// would return a M_BAD_JSON error.
|
||||
let mut resolved_map = BTreeMap::new();
|
||||
for pdu in &body.pdus {
|
||||
'main_pdu_loop: for pdu in &body.pdus {
|
||||
// 1. Is a valid event, otherwise it is dropped.
|
||||
// Ruma/PduEvent/StateEvent satisfies this
|
||||
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
||||
|
@ -660,7 +660,6 @@ pub async fn send_transaction_message_route<'a>(
|
|||
};
|
||||
|
||||
let pdu = Arc::new(pdu.clone());
|
||||
|
||||
// Fetch any unknown prev_events or retrieve them from the DB
|
||||
let previous = match fetch_events(
|
||||
&db,
|
||||
|
@ -675,6 +674,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
_ => None,
|
||||
};
|
||||
|
||||
// [auth_cache] At this point we have the auth chain of the incoming event.
|
||||
let mut event_map: state_res::EventMap<Arc<PduEvent>> = auth_cache
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
|
@ -688,7 +688,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
&pdu.auth_events
|
||||
.iter()
|
||||
.map(|id| {
|
||||
event_map
|
||||
auth_cache
|
||||
.get(id)
|
||||
.map(|pdu| ((pdu.kind(), pdu.state_key()), pdu.clone()))
|
||||
.ok_or_else(|| {
|
||||
|
@ -790,7 +790,15 @@ pub async fn send_transaction_message_route<'a>(
|
|||
// End of step 5.
|
||||
|
||||
// Gather the forward extremities and resolve
|
||||
let fork_states = match forward_extremity_ids(&db, &pdu) {
|
||||
let fork_states = match forward_extremity_ids(
|
||||
&db,
|
||||
&pdu,
|
||||
server_name,
|
||||
&pub_key_map,
|
||||
&mut auth_cache,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(states) => states,
|
||||
Err(_) => {
|
||||
resolved_map.insert(event_id, Err("Failed to gather forward extremities".into()));
|
||||
|
@ -805,47 +813,44 @@ pub async fn send_transaction_message_route<'a>(
|
|||
} else if fork_states.len() == 1 {
|
||||
fork_states[0].clone()
|
||||
} else {
|
||||
// TODO: remove this is for current debugging Jan, 15 2021
|
||||
let mut number_fetches = 0_u32;
|
||||
let mut auth_events = vec![];
|
||||
// this keeps track if we error so we can break out of these inner loops
|
||||
// to continue on with the incoming PDU's
|
||||
let mut failed = false;
|
||||
for map in &fork_states {
|
||||
let mut state_auth = vec![];
|
||||
for pdu in map.values() {
|
||||
let event = match auth_cache.get(pdu.event_id()) {
|
||||
for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) {
|
||||
let event = match auth_cache.get(auth_id) {
|
||||
Some(aev) => aev.clone(),
|
||||
// We should know about every event at this point but just incase...
|
||||
None => match fetch_events(
|
||||
&db,
|
||||
server_name,
|
||||
&pub_key_map,
|
||||
&[pdu.event_id().clone()],
|
||||
&[auth_id.clone()],
|
||||
&mut auth_cache,
|
||||
)
|
||||
.await
|
||||
.map(|mut vec| vec.remove(0))
|
||||
{
|
||||
Ok(aev) => aev.clone(),
|
||||
.map(|mut vec| {
|
||||
number_fetches += 1;
|
||||
vec.remove(0)
|
||||
}) {
|
||||
Ok(aev) => aev,
|
||||
Err(_) => {
|
||||
resolved_map.insert(
|
||||
event_id.clone(),
|
||||
Err("Event has been soft failed".into()),
|
||||
);
|
||||
failed = true;
|
||||
break;
|
||||
continue 'main_pdu_loop;
|
||||
}
|
||||
},
|
||||
};
|
||||
state_auth.push(event);
|
||||
}
|
||||
if failed {
|
||||
break;
|
||||
}
|
||||
auth_events.push(state_auth);
|
||||
}
|
||||
if failed {
|
||||
continue;
|
||||
}
|
||||
info!("{} event's were not in the auth_cache", number_fetches);
|
||||
|
||||
// Add everything we will need to event_map
|
||||
event_map.extend(
|
||||
|
@ -886,7 +891,13 @@ pub async fn send_transaction_message_route<'a>(
|
|||
.into_iter()
|
||||
.map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap())))
|
||||
.collect(),
|
||||
Err(e) => panic!("{:?}", e),
|
||||
Err(_) => {
|
||||
resolved_map.insert(
|
||||
pdu.event_id().clone(),
|
||||
Err("State resolution failed, either an event could not be found or deserialization".into()),
|
||||
);
|
||||
continue 'main_pdu_loop;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -914,6 +925,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
Ok(dbg!(send_transaction_message::v1::Response { pdus: resolved_map }).into())
|
||||
}
|
||||
|
||||
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
|
||||
/// Validate any event that is given to us by another server.
|
||||
///
|
||||
/// 1. Is a valid event, otherwise it is dropped (PduEvent deserialization satisfies this).
|
||||
|
@ -955,6 +967,37 @@ fn validate_event<'a>(
|
|||
})
|
||||
}
|
||||
|
||||
/// TODO: don't add as outlier if event is fetched as a result of gathering auth_events
|
||||
/// The check in `fetch_check_auth_events` is that a complete chain is found for the
|
||||
/// events `auth_events`. If the chain is found to have any missing events it fails.
|
||||
///
|
||||
/// The `auth_cache` is filled instead of returning a `Vec`.
|
||||
async fn fetch_check_auth_events(
|
||||
db: &Database,
|
||||
origin: &ServerName,
|
||||
key_map: &PublicKeyMap,
|
||||
event_ids: &[EventId],
|
||||
auth_cache: &mut EventMap<Arc<PduEvent>>,
|
||||
) -> Result<()> {
|
||||
let mut stack = event_ids.to_vec();
|
||||
|
||||
// DFS for auth event chain
|
||||
while !stack.is_empty() {
|
||||
let ev_id = stack.pop().unwrap();
|
||||
if auth_cache.contains_key(&ev_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
|
||||
.await
|
||||
.map(|mut vec| vec.remove(0))?;
|
||||
|
||||
stack.extend(ev.auth_events());
|
||||
auth_cache.insert(ev.event_id().clone(), ev);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Find the event and auth it.
|
||||
///
|
||||
/// 1. Look in the main timeline (pduid_pdu tree)
|
||||
|
@ -1000,36 +1043,6 @@ async fn fetch_events(
|
|||
Ok(pdus)
|
||||
}
|
||||
|
||||
/// The check in `fetch_check_auth_events` is that a complete chain is found for the
|
||||
/// events `auth_events`. If the chain is found to have any missing events it fails.
|
||||
///
|
||||
/// The `auth_cache` is filled instead of returning a `Vec`.
|
||||
async fn fetch_check_auth_events(
|
||||
db: &Database,
|
||||
origin: &ServerName,
|
||||
key_map: &PublicKeyMap,
|
||||
event_ids: &[EventId],
|
||||
auth_cache: &mut EventMap<Arc<PduEvent>>,
|
||||
) -> Result<()> {
|
||||
let mut stack = event_ids.to_vec();
|
||||
|
||||
// DFS for auth event chain
|
||||
while !stack.is_empty() {
|
||||
let ev_id = stack.pop().unwrap();
|
||||
if auth_cache.contains_key(&ev_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ev = fetch_events(db, origin, key_map, &[ev_id.clone()], auth_cache)
|
||||
.await
|
||||
.map(|mut vec| vec.remove(0))?;
|
||||
|
||||
stack.extend(ev.auth_events());
|
||||
auth_cache.insert(ev.event_id().clone(), ev);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Search the DB for the signing keys of the given server, if we don't have them
|
||||
/// fetch them from the server and save to our DB.
|
||||
async fn fetch_signing_keys(
|
||||
|
@ -1049,6 +1062,7 @@ async fn fetch_signing_keys(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn signature_and_hash_check(
|
||||
pub_key_map: &ruma::signatures::PublicKeyMap,
|
||||
value: CanonicalJsonObject,
|
||||
|
@ -1073,9 +1087,23 @@ fn signature_and_hash_check(
|
|||
)
|
||||
}
|
||||
|
||||
fn forward_extremity_ids(db: &Database, pdu: &PduEvent) -> Result<Vec<StateMap<Arc<PduEvent>>>> {
|
||||
async fn forward_extremity_ids(
|
||||
db: &Database,
|
||||
pdu: &PduEvent,
|
||||
origin: &ServerName,
|
||||
pub_key_map: &PublicKeyMap,
|
||||
auth_cache: &mut EventMap<Arc<PduEvent>>,
|
||||
) -> Result<Vec<StateMap<Arc<PduEvent>>>> {
|
||||
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
|
||||
|
||||
for incoming_leaf in &pdu.prev_events {
|
||||
if !current_leaves.contains(incoming_leaf) {
|
||||
current_leaves.push(incoming_leaf.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let mut fork_states = vec![];
|
||||
for id in &db.rooms.get_pdu_leaves(pdu.room_id())? {
|
||||
for id in ¤t_leaves {
|
||||
if let Some(id) = db.rooms.get_pdu_id(id)? {
|
||||
let state_hash = db
|
||||
.rooms
|
||||
|
@ -1090,11 +1118,32 @@ fn forward_extremity_ids(db: &Database, pdu: &PduEvent) -> Result<Vec<StateMap<A
|
|||
|
||||
fork_states.push(state);
|
||||
} else {
|
||||
return Err(Error::Conflict(
|
||||
"we don't know of a pdu that is part of our known forks OOPS",
|
||||
));
|
||||
let res = db
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
origin,
|
||||
get_room_state_ids::v1::Request {
|
||||
room_id: pdu.room_id(),
|
||||
event_id: id,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// TODO: This only adds events to the auth_cache, there is for sure a better way to
|
||||
// do this...
|
||||
fetch_events(&db, origin, &pub_key_map, &res.auth_chain_ids, auth_cache).await?;
|
||||
|
||||
let state = fetch_events(&db, origin, &pub_key_map, &res.pdu_ids, auth_cache)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
|
||||
.collect();
|
||||
|
||||
fork_states.push(state);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(fork_states)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue