Merge branch 'speed' into 'master'
improvement: call /state_ids less often by using state res See merge request famedly/conduit!174
This commit is contained in:
commit
d0d47675d8
1 changed files with 116 additions and 13 deletions
|
@ -964,6 +964,16 @@ pub async fn handle_incoming_pdu<'a>(
|
|||
return Ok(None);
|
||||
}
|
||||
|
||||
if incoming_pdu.origin_server_ts
|
||||
< db.rooms
|
||||
.first_pdu_in_room(&room_id)
|
||||
.map_err(|_| "Error loading first room event.".to_owned())?
|
||||
.expect("Room exists")
|
||||
.origin_server_ts
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
|
||||
let mut graph = HashMap::new();
|
||||
let mut eventid_info = HashMap::new();
|
||||
|
@ -1280,6 +1290,18 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
return Err("Event has been soft failed".into());
|
||||
}
|
||||
|
||||
let create_event_content =
|
||||
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone())
|
||||
.expect("Raw::from_value always works.")
|
||||
.deserialize()
|
||||
.map_err(|e| {
|
||||
warn!("Invalid create event: {}", e);
|
||||
"Invalid create event in db.".to_owned()
|
||||
})?;
|
||||
|
||||
let room_version_id = &create_event_content.room_version;
|
||||
let room_version = RoomVersion::new(room_version_id).expect("room version is supported");
|
||||
|
||||
// 10. Fetch missing state and auth chain events by calling /state_ids at backwards extremities
|
||||
// doing all the checks in this list starting at 1. These are not timeline events.
|
||||
|
||||
|
@ -1318,7 +1340,100 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
|
||||
state_at_incoming_event = Some(state);
|
||||
}
|
||||
// TODO: set incoming_auth_events?
|
||||
} else {
|
||||
warn!("Calculating state at event using state res");
|
||||
let mut extremity_sstatehashes = HashMap::new();
|
||||
|
||||
let mut okay = true;
|
||||
for prev_eventid in &incoming_pdu.prev_events {
|
||||
let prev_event = if let Ok(Some(pdu)) = db.rooms.get_pdu(prev_eventid) {
|
||||
pdu
|
||||
} else {
|
||||
okay = false;
|
||||
break;
|
||||
};
|
||||
|
||||
let sstatehash = if let Ok(Some(s)) = db.rooms.pdu_shortstatehash(prev_eventid) {
|
||||
s
|
||||
} else {
|
||||
okay = false;
|
||||
break;
|
||||
};
|
||||
|
||||
extremity_sstatehashes.insert(sstatehash, prev_event);
|
||||
}
|
||||
|
||||
let mut fork_states = Vec::new();
|
||||
|
||||
if okay {
|
||||
for (sstatehash, prev_event) in extremity_sstatehashes {
|
||||
let mut leaf_state = db
|
||||
.rooms
|
||||
.state_full_ids(sstatehash)
|
||||
.map_err(|_| "Failed to ask db for room state.".to_owned())?;
|
||||
|
||||
if let Some(state_key) = &prev_event.state_key {
|
||||
let shortstatekey = db
|
||||
.rooms
|
||||
.get_or_create_shortstatekey(&prev_event.kind, state_key, &db.globals)
|
||||
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
|
||||
leaf_state.insert(shortstatekey, Arc::new(prev_event.event_id.clone()));
|
||||
// Now it's the state after the pdu
|
||||
}
|
||||
|
||||
fork_states.push(leaf_state);
|
||||
}
|
||||
|
||||
let fork_states = &fork_states
|
||||
.into_iter()
|
||||
.map(|map| {
|
||||
map.into_iter()
|
||||
.map(|(k, id)| (db.rooms.get_statekey_from_short(k).map(|k| (k, id))))
|
||||
.collect::<Result<StateMap<_>>>()
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
|
||||
|
||||
let mut auth_chain_sets = Vec::new();
|
||||
for state in fork_states {
|
||||
auth_chain_sets.push(
|
||||
get_auth_chain(state.iter().map(|(_, id)| id.clone()).collect(), db)
|
||||
.map_err(|_| "Failed to load auth chain.".to_owned())?
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
|
||||
state_at_incoming_event = match state_res::StateResolution::resolve(
|
||||
&room_id,
|
||||
room_version_id,
|
||||
&fork_states,
|
||||
auth_chain_sets,
|
||||
|id| {
|
||||
let res = db.rooms.get_pdu(id);
|
||||
if let Err(e) = &res {
|
||||
error!("LOOK AT ME Failed to fetch event: {}", e);
|
||||
}
|
||||
res.ok().flatten()
|
||||
},
|
||||
) {
|
||||
Ok(new_state) => Some(
|
||||
new_state
|
||||
.into_iter()
|
||||
.map(|((event_type, state_key), event_id)| {
|
||||
let shortstatekey = db
|
||||
.rooms
|
||||
.get_or_create_shortstatekey(&event_type, &state_key, &db.globals)
|
||||
.map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?;
|
||||
Ok((shortstatekey, event_id))
|
||||
})
|
||||
.collect::<StdResult<_, String>>()?,
|
||||
),
|
||||
Err(e) => {
|
||||
warn!("State resolution on prev events failed, either an event could not be found or deserialization: {}", e);
|
||||
None
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if state_at_incoming_event.is_none() {
|
||||
|
@ -1402,18 +1517,6 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
state_at_incoming_event.expect("we always set this to some above");
|
||||
|
||||
// 11. Check the auth of the event passes based on the state of the event
|
||||
let create_event_content =
|
||||
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone())
|
||||
.expect("Raw::from_value always works.")
|
||||
.deserialize()
|
||||
.map_err(|e| {
|
||||
warn!("Invalid create event: {}", e);
|
||||
"Invalid create event in db.".to_owned()
|
||||
})?;
|
||||
|
||||
let room_version_id = &create_event_content.room_version;
|
||||
let room_version = RoomVersion::new(room_version_id).expect("room version is supported");
|
||||
|
||||
// If the previous event was the create event special rules apply
|
||||
let previous_create = if incoming_pdu.auth_events.len() == 1
|
||||
&& incoming_pdu.prev_events == incoming_pdu.auth_events
|
||||
|
|
Loading…
Reference in a new issue