WIP gather and update forward extremities

next
Devin Ragotzy 2021-01-18 19:08:59 -05:00
parent b1ae2bbeb2
commit c65bde41e0
3 changed files with 132 additions and 45 deletions

View File

@ -397,6 +397,24 @@ impl Rooms {
Ok(events) Ok(events)
} }
/// Force an update to the leaves of a room.
pub fn force_pdu_leaves(&self, room_id: &RoomId, event_ids: &[EventId]) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for key in self.roomid_pduleaves.scan_prefix(&prefix).keys() {
self.roomid_pduleaves.remove(key?)?;
}
for event_id in event_ids.iter() {
let mut key = prefix.to_owned();
key.extend_from_slice(event_id.as_bytes());
self.roomid_pduleaves.insert(&key, event_id.as_bytes())?;
}
Ok(())
}
/// Replace the leaves of a room with a new event. /// Replace the leaves of a room with a new event.
pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_id: &EventId) -> Result<()> { pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_id: &EventId) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec(); let mut prefix = room_id.as_bytes().to_vec();

View File

@ -9,7 +9,7 @@ use ruma::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use std::{collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH}; use std::{cmp::Ordering, collections::BTreeMap, convert::TryFrom, time::UNIX_EPOCH};
#[derive(Clone, Deserialize, Serialize, Debug)] #[derive(Clone, Deserialize, Serialize, Debug)]
pub struct PduEvent { pub struct PduEvent {
@ -284,6 +284,25 @@ impl state_res::Event for PduEvent {
} }
} }
// These impl's allow us to dedup state snapshots when resolving state
// for incoming events (federation/send/{txn}).
impl Eq for PduEvent {}
impl PartialEq for PduEvent {
fn eq(&self, other: &Self) -> bool {
self.event_id == other.event_id
}
}
impl PartialOrd for PduEvent {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.event_id.partial_cmp(&other.event_id)
}
}
impl Ord for PduEvent {
fn cmp(&self, other: &Self) -> Ordering {
self.event_id.cmp(&other.event_id)
}
}
/// Generates a correct eventId for the incoming pdu. /// Generates a correct eventId for the incoming pdu.
/// ///
/// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap<String, CanonicalJsonValue>`. /// Returns a tuple of the new `EventId` and the PDU as a `BTreeMap<String, CanonicalJsonValue>`.

View File

@ -5,7 +5,6 @@ use log::{error, info, warn};
use rocket::{get, post, put, response::content::Json, State}; use rocket::{get, post, put, response::content::Json, State};
use ruma::{ use ruma::{
api::{ api::{
client::r0::state,
federation::{ federation::{
directory::{get_public_rooms, get_public_rooms_filtered}, directory::{get_public_rooms, get_public_rooms_filtered},
discovery::{ discovery::{
@ -25,7 +24,7 @@ use ruma::{
}; };
use state_res::{Event, EventMap, StateMap}; use state_res::{Event, EventMap, StateMap};
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet, HashSet},
convert::TryFrom, convert::TryFrom,
fmt::Debug, fmt::Debug,
future::Future, future::Future,
@ -600,13 +599,13 @@ pub async fn send_transaction_message_route<'a>(
let server_name = &body.body.origin; let server_name = &body.body.origin;
let mut pub_key_map = BTreeMap::new(); let mut pub_key_map = BTreeMap::new();
if let Some(sig) = value.get("signatures") {
match sig { if let Some(CanonicalJsonValue::String(sender)) = value.get("sender") {
CanonicalJsonValue::Object(entity) => { let sender =
for key in entity.keys() { UserId::try_from(sender.as_str()).expect("All PDUs have a valid sender field");
// TODO: save this in a DB maybe... let origin = sender.server_name();
// fetch the public signing key
let origin = <&ServerName>::try_from(key.as_str()).unwrap(); // TODO: this could fail or the server not respond...
let keys = fetch_signing_keys(&db, origin).await?; let keys = fetch_signing_keys(&db, origin).await?;
pub_key_map.insert( pub_key_map.insert(
@ -615,16 +614,6 @@ pub async fn send_transaction_message_route<'a>(
.map(|(k, v)| (k.to_string(), v.key)) .map(|(k, v)| (k.to_string(), v.key))
.collect(), .collect(),
); );
}
}
_ => {
resolved_map.insert(
event_id,
Err("`signatures` is not a JSON object".to_string()),
);
continue;
}
}
} else { } else {
resolved_map.insert(event_id, Err("No field `signatures` in JSON".to_string())); resolved_map.insert(event_id, Err("No field `signatures` in JSON".to_string()));
continue; continue;
@ -642,8 +631,9 @@ pub async fn send_transaction_message_route<'a>(
// 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" // 4. reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
// 5. reject "due to auth events" if the event doesn't pass auth based on the auth events // 5. reject "due to auth events" if the event doesn't pass auth based on the auth events
// 7. if not timeline event: stop // 7. if not timeline event: stop
// 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events // TODO; 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
let (pdu, previous) = match validate_event( // the events found in step 8 can be authed/resolved and appended to the DB
let (pdu, previous): (_, Vec<Arc<PduEvent>>) = match validate_event(
&db, &db,
value, value,
event_id.clone(), event_id.clone(),
@ -670,6 +660,9 @@ pub async fn send_transaction_message_route<'a>(
// 6. persist the event as an outlier. // 6. persist the event as an outlier.
db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?; db.rooms.append_pdu_outlier(pdu.event_id(), &pdu)?;
// Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
// the checks in this list starting at 1. These are not timeline events.
//
// Step 10. check the auth of the event passes based on the calculated state of the event // Step 10. check the auth of the event passes based on the calculated state of the event
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) = let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
match db match db
@ -771,8 +764,12 @@ pub async fn send_transaction_message_route<'a>(
); );
}; };
// Gather the forward extremities and resolve // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
let fork_states = match forward_extremities( // where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
//
// calculate_forward_extremities takes care of adding the current state if not already in the state sets
// it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
let (mut fork_states, fork_ids) = match calculate_forward_extremities(
&db, &db,
&pdu, &pdu,
server_name, server_name,
@ -788,6 +785,12 @@ pub async fn send_transaction_message_route<'a>(
} }
}; };
// add the incoming events to the mix of state snapshots
// Since we are using a BTreeSet (yea this may be overkill) we guarantee unique state sets
fork_states.insert(state_at_event.clone());
let fork_states = fork_states.into_iter().collect::<Vec<_>>();
// 13. start state-res with all previous forward extremities minus the ones that are in // 13. start state-res with all previous forward extremities minus the ones that are in
// the prev_events of this event plus the new one created by this event and use // the prev_events of this event plus the new one created by this event and use
// the result as the new room state // the result as the new room state
@ -901,7 +904,9 @@ pub async fn send_transaction_message_route<'a>(
Err("Event has been soft failed".into()), Err("Event has been soft failed".into()),
); );
} else { } else {
append_state(&db, &pdu)?; // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
append_state(&db, &pdu, &fork_ids)?;
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
resolved_map.insert(pdu.event_id().clone(), Ok(())); resolved_map.insert(pdu.event_id().clone(), Ok(()));
} }
@ -1106,25 +1111,52 @@ async fn fetch_signing_keys(
/// Gather all state snapshots needed to resolve the current state of the room. /// Gather all state snapshots needed to resolve the current state of the room.
/// ///
/// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res /// Step 11. ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
/// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote) /// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote).
async fn forward_extremities( ///
/// The state snapshot of the incoming event __needs__ to be added to the resulting list.
async fn calculate_forward_extremities(
db: &Database, db: &Database,
pdu: &PduEvent, pdu: &PduEvent,
origin: &ServerName, origin: &ServerName,
pub_key_map: &PublicKeyMap, pub_key_map: &PublicKeyMap,
auth_cache: &mut EventMap<Arc<PduEvent>>, auth_cache: &mut EventMap<Arc<PduEvent>>,
) -> Result<Vec<StateMap<Arc<PduEvent>>>> { ) -> Result<(BTreeSet<StateMap<Arc<PduEvent>>>, Vec<EventId>)> {
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?; let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
let mut is_incoming_leaf = true;
// Make sure the incoming event is not already a forward extremity
// FIXME: I think this could happen if different servers send us the same event??
if current_leaves.contains(pdu.event_id()) {
is_incoming_leaf = false;
// Not sure what to do here
}
// If the incoming event is already referenced by an existing event
// then do nothing - it's not a candidate to be a new extremity if
// it has been referenced.
if already_referenced(db, pdu)? {
is_incoming_leaf = false;
// This event has been dealt with already??
}
// TODO:
// [dendrite] Checks if any other leaves have been referenced and removes them
// but as long as we update the pdu leaves here and for events on our server this
// should not be possible.
// Remove any forward extremities that are referenced by this incoming events prev_events
for incoming_leaf in &pdu.prev_events { for incoming_leaf in &pdu.prev_events {
if !current_leaves.contains(incoming_leaf) { if current_leaves.contains(incoming_leaf) {
current_leaves.push(incoming_leaf.clone()); if let Some(pos) = current_leaves.iter().position(|x| *x == *incoming_leaf) {
current_leaves.remove(pos);
}
} }
} }
let current_hash = db.rooms.current_state_hash(pdu.room_id())?; let current_hash = db.rooms.current_state_hash(pdu.room_id())?;
let mut includes_current_state = false; let mut includes_current_state = false;
let mut fork_states = vec![]; let mut fork_states = BTreeSet::new();
for id in &current_leaves { for id in &current_leaves {
if let Some(id) = db.rooms.get_pdu_id(id)? { if let Some(id) = db.rooms.get_pdu_id(id)? {
let state_hash = db let state_hash = db
@ -1142,8 +1174,10 @@ async fn forward_extremities(
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect(); .collect();
fork_states.push(state); fork_states.insert(state);
} else { } else {
error!("Forward extremity not found... {}", id);
let res = db let res = db
.sending .sending
.send_federation_request( .send_federation_request(
@ -1166,25 +1200,37 @@ async fn forward_extremities(
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu)) .map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), pdu))
.collect(); .collect();
fork_states.push(state); fork_states.insert(state);
} }
} }
// Add the incoming event only if it is a leaf, we do this after fetching all the
// state since we know we have already fetched the state of the incoming event so lets
// not do it again!
if is_incoming_leaf {
current_leaves.push(pdu.event_id().clone());
}
// This guarantees that our current room state is included // This guarantees that our current room state is included
if !includes_current_state && current_hash.is_some() { if !includes_current_state && current_hash.is_some() {
fork_states.push( fork_states.insert(
db.rooms db.rooms
.state_full(pdu.room_id(), current_hash.as_ref().unwrap())? .state_full(pdu.room_id(), current_hash.as_ref().unwrap())?
.into_iter() .into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect(), .collect(),
) );
} }
Ok(fork_states) Ok((fork_states, dbg!(current_leaves)))
} }
fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> { /// TODO: we need to know if the event is a prev_event (is this event already referenced in the DAG)
fn already_referenced(_db: &Database, _pdu: &PduEvent) -> Result<bool> {
Ok(false)
}
fn append_state(db: &Database, pdu: &PduEvent, new_room_leaves: &[EventId]) -> Result<()> {
let count = db.globals.next_count()?; let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec(); let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff); pdu_id.push(0xff);
@ -1195,13 +1241,17 @@ fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> {
let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?;
db.rooms.append_pdu( db.rooms.append_pdu(
&pdu, pdu,
utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"), utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
count, count,
pdu_id.clone().into(), pdu_id.clone().into(),
&db, &db,
)?; )?;
// If we update the room leaves after calling append_pdu it will stick since append_pdu
// calls replace_pdu_leaves with only the given event.
db.rooms.force_pdu_leaves(pdu.room_id(), new_room_leaves)?;
// We set the room state after inserting the pdu, so that we never have a moment in time // We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist // where events in the current room state do not exist
db.rooms.set_room_state(&pdu.room_id, &statehashid)?; db.rooms.set_room_state(&pdu.room_id, &statehashid)?;