fix: multiple federation/pusher fixes

next
Timo Kösters 2021-03-16 18:00:26 +01:00
parent 21f785d530
commit 44425a903a
No known key found for this signature in database
GPG Key ID: 24DA7517711A2BA4
7 changed files with 85 additions and 167 deletions

View File

@ -686,10 +686,10 @@ pub async fn get_pushers_route(
db: State<'_, Database>, db: State<'_, Database>,
body: Ruma<get_pushers::Request>, body: Ruma<get_pushers::Request>,
) -> ConduitResult<get_pushers::Response> { ) -> ConduitResult<get_pushers::Response> {
let sender = body.sender_user.as_ref().expect("authenticated endpoint"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
Ok(get_pushers::Response { Ok(get_pushers::Response {
pushers: db.pusher.get_pusher(sender)?, pushers: db.pusher.get_pusher(sender_user)?,
} }
.into()) .into())
} }
@ -703,10 +703,10 @@ pub async fn set_pushers_route(
db: State<'_, Database>, db: State<'_, Database>,
body: Ruma<set_pusher::Request>, body: Ruma<set_pusher::Request>,
) -> ConduitResult<set_pusher::Response> { ) -> ConduitResult<set_pusher::Response> {
let sender = body.sender_user.as_ref().expect("authenticated endpoint"); let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let pusher = body.pusher.clone(); let pusher = body.pusher.clone();
db.pusher.set_pusher(sender, pusher)?; db.pusher.set_pusher(sender_user, pusher)?;
db.flush().await?; db.flush().await?;

View File

@ -315,7 +315,7 @@ pub async fn sync_events_route(
(None, None, Vec::new()) (None, None, Vec::new())
}; };
let state_events = if dbg!(joined_since_last_sync) { let state_events = if joined_since_last_sync {
current_state current_state
.into_iter() .into_iter()
.map(|(_, pdu)| pdu.to_sync_state_event()) .map(|(_, pdu)| pdu.to_sync_state_event())
@ -703,12 +703,7 @@ pub async fn sync_events_route(
if duration.as_secs() > 30 { if duration.as_secs() > 30 {
duration = Duration::from_secs(30); duration = Duration::from_secs(30);
} }
let delay = tokio::time::sleep(duration); let _ = tokio::time::timeout(duration, watcher).await;
tokio::pin!(delay);
tokio::select! {
_ = &mut delay, if delay.is_elapsed() => {}
_ = watcher => {}
}
} }
Ok(response.into()) Ok(response.into())

View File

@ -35,8 +35,6 @@ impl PushData {
} }
pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> { pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> {
println!("CCCCCCCCCCCCCCCCCCCCCc");
dbg!(&pusher);
let mut key = sender.as_bytes().to_vec(); let mut key = sender.as_bytes().to_vec();
key.push(0xff); key.push(0xff);
key.extend_from_slice(pusher.pushkey.as_bytes()); key.extend_from_slice(pusher.pushkey.as_bytes());
@ -51,7 +49,7 @@ impl PushData {
} }
self.senderkey_pusher.insert( self.senderkey_pusher.insert(
dbg!(key), key,
&*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"), &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"),
)?; )?;
@ -63,12 +61,10 @@ impl PushData {
prefix.push(0xff); prefix.push(0xff);
self.senderkey_pusher self.senderkey_pusher
.scan_prefix(dbg!(prefix)) .scan_prefix(prefix)
.values() .values()
.map(|push| { .map(|push| {
println!("DDDDDDDDDDDDDDDDDDDDDDDDDD"); let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?;
let push =
dbg!(push).map_err(|_| Error::bad_database("Invalid push bytes in db."))?;
Ok(serde_json::from_slice(&*push) Ok(serde_json::from_slice(&*push)
.map_err(|_| Error::bad_database("Invalid Pusher in db."))?) .map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
}) })
@ -100,10 +96,7 @@ where
//*reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); //*reqwest_request.timeout_mut() = Some(Duration::from_secs(5));
let url = reqwest_request.url().clone(); let url = reqwest_request.url().clone();
let reqwest_response = globals let reqwest_response = globals.reqwest_client().execute(reqwest_request).await;
.reqwest_client()
.execute(dbg!(reqwest_request))
.await;
// Because reqwest::Response -> http::Response is complicated: // Because reqwest::Response -> http::Response is complicated:
match reqwest_response { match reqwest_response {
@ -182,7 +175,7 @@ pub async fn send_push_notice(
continue; continue;
} }
match dbg!(rule.rule_id.as_str()) { match rule.rule_id.as_str() {
".m.rule.master" => {} ".m.rule.master" => {}
".m.rule.suppress_notices" => { ".m.rule.suppress_notices" => {
if pdu.kind == EventType::RoomMessage if pdu.kind == EventType::RoomMessage
@ -454,8 +447,7 @@ async fn send_notice(
db: &Database, db: &Database,
name: &str, name: &str,
) -> Result<()> { ) -> Result<()> {
println!("BBBBBBBBBBBBBBBr"); let (http, _emails): (Vec<&Pusher>, _) = pushers
let (http, _emails): (Vec<&Pusher>, _) = dbg!(pushers)
.iter() .iter()
.partition(|pusher| pusher.kind == Some(PusherKind::Http)); .partition(|pusher| pusher.kind == Some(PusherKind::Http));
@ -463,7 +455,7 @@ async fn send_notice(
// Two problems with this // Two problems with this
// 1. if "event_id_only" is the only format kind it seems we should never add more info // 1. if "event_id_only" is the only format kind it seems we should never add more info
// 2. can pusher/devices have conflicting formats // 2. can pusher/devices have conflicting formats
for pusher in dbg!(http) { for pusher in http {
let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
let url = if let Some(url) = pusher.data.url.as_ref() { let url = if let Some(url) = pusher.data.url.as_ref() {
url url

View File

@ -3,7 +3,7 @@ mod edus;
pub use edus::RoomEdus; pub use edus::RoomEdus;
use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result};
use log::{error, info, warn}; use log::{debug, error, info, warn};
use regex::Regex; use regex::Regex;
use ring::digest; use ring::digest;
use ruma::{ use ruma::{
@ -67,7 +67,7 @@ pub struct Rooms {
/// StateKey = EventType + StateKey, Short = Count /// StateKey = EventType + StateKey, Short = Count
pub(super) statekey_short: sled::Tree, pub(super) statekey_short: sled::Tree,
/// StateId = StateHash + Short, PduId = Count (without roomid) /// StateId = StateHash + Short, PduId = Count (without roomid)
pub(super) stateid_pduid: sled::Tree, pub(super) stateid_eventid: sled::Tree,
/// RoomId + EventId -> outlier PDU. /// RoomId + EventId -> outlier PDU.
/// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.
@ -138,7 +138,7 @@ impl Rooms {
key.push(0xff); key.push(0xff);
key.extend_from_slice(&state_key.as_bytes()); key.extend_from_slice(&state_key.as_bytes());
info!("Looking for {} {:?}", event_type, state_key); debug!("Looking for {} {:?}", event_type, state_key);
let short = self.statekey_short.get(&key)?; let short = self.statekey_short.get(&key)?;
@ -147,11 +147,11 @@ impl Rooms {
stateid.push(0xff); stateid.push(0xff);
stateid.extend_from_slice(&short); stateid.extend_from_slice(&short);
info!("trying to find pduid/eventid. short: {:?}", stateid); debug!("trying to find pduid/eventid. short: {:?}", stateid);
self.stateid_pduid self.stateid_pduid
.get(&stateid)? .get(&stateid)?
.map_or(Ok(None), |short_id| { .map_or(Ok(None), |short_id| {
info!("found in stateid_pduid"); debug!("found in stateid_pduid");
let mut long_id = room_id.as_bytes().to_vec(); let mut long_id = room_id.as_bytes().to_vec();
long_id.push(0xff); long_id.push(0xff);
long_id.extend_from_slice(&short_id); long_id.extend_from_slice(&short_id);
@ -163,7 +163,7 @@ impl Rooms {
.map_err(|_| Error::bad_database("Invalid PDU in db."))?, .map_err(|_| Error::bad_database("Invalid PDU in db."))?,
), ),
None => { None => {
info!("looking in outliers"); debug!("looking in outliers");
( (
short_id.clone().into(), short_id.clone().into(),
self.eventid_outlierpdu self.eventid_outlierpdu
@ -180,7 +180,7 @@ impl Rooms {
})) }))
}) })
} else { } else {
info!("short id not found"); warn!("short id not found");
Ok(None) Ok(None)
} }
} }
@ -288,7 +288,7 @@ impl Rooms {
let mut state_id = prefix.clone(); let mut state_id = prefix.clone();
state_id.extend_from_slice(&short.to_be_bytes()); state_id.extend_from_slice(&short.to_be_bytes());
info!("inserting {:?} into {:?}", short_id, state_id); debug!("inserting {:?} into {:?}", short_id, state_id);
self.stateid_pduid.insert(state_id, short_id)?; self.stateid_pduid.insert(state_id, short_id)?;
} }
@ -574,7 +574,7 @@ impl Rooms {
self.pduid_pdu.insert( self.pduid_pdu.insert(
&pdu_id, &pdu_id,
&*serde_json::to_string(&pdu_json) &*serde_json::to_string(dbg!(&pdu_json))
.expect("CanonicalJsonObject is always a valid String"), .expect("CanonicalJsonObject is always a valid String"),
)?; )?;
@ -889,12 +889,12 @@ impl Rooms {
content.clone(), content.clone(),
prev_event, prev_event,
None, // TODO: third party invite None, // TODO: third party invite
dbg!(&auth_events &auth_events
.iter() .iter()
.map(|((ty, key), pdu)| { .map(|((ty, key), pdu)| {
Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone()))) Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone())))
}) })
.collect::<Result<StateMap<_>>>()?), .collect::<Result<StateMap<_>>>()?,
) )
.map_err(|e| { .map_err(|e| {
log::error!("{}", e); log::error!("{}", e);

View File

@ -10,7 +10,7 @@ use crate::{
appservice_server, database::pusher, server_server, utils, Database, Error, PduEvent, Result, appservice_server, database::pusher, server_server, utils, Database, Error, PduEvent, Result,
}; };
use federation::transactions::send_transaction_message; use federation::transactions::send_transaction_message;
use log::{info, warn}; use log::{debug, error, info, warn};
use ring::digest; use ring::digest;
use rocket::futures::stream::{FuturesUnordered, StreamExt}; use rocket::futures::stream::{FuturesUnordered, StreamExt};
use ruma::{ use ruma::{
@ -308,8 +308,6 @@ impl Sending {
key.extend_from_slice(pdu_id); key.extend_from_slice(pdu_id);
self.servernamepduids.insert(key, b"")?; self.servernamepduids.insert(key, b"")?;
println!("AAAA");
Ok(()) Ok(())
} }
@ -348,7 +346,7 @@ impl Sending {
pdu_ids: Vec<IVec>, pdu_ids: Vec<IVec>,
db: &Database, db: &Database,
) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> { ) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> {
match dbg!(&kind) { match &kind {
OutgoingKind::Appservice(server) => { OutgoingKind::Appservice(server) => {
let pdu_jsons = pdu_ids let pdu_jsons = pdu_ids
.iter() .iter()
@ -414,21 +412,23 @@ impl Sending {
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for pdu in dbg!(&pdus) { for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them) // Redacted events are not notification targets (we don't send push for them)
if pdu.unsigned.get("redacted_because").is_some() { if pdu.unsigned.get("redacted_because").is_some() {
continue; continue;
} }
for user in db.rooms.room_members(&pdu.room_id) { for user in db.users.iter().filter_map(|r| r.ok()).filter(|user_id| {
let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false)
}) {
// Don't notify the user of their own events // Don't notify the user of their own events
if user == pdu.sender { if user == pdu.sender {
continue; continue;
} }
let pushers = dbg!(db.pusher.get_pusher(&user)) let pushers = db
.pusher
.get_pusher(&user)
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; .map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
let rules_for_user = db let rules_for_user = db
@ -467,7 +467,7 @@ impl Sending {
unread, unread,
&pushers, &pushers,
rules_for_user, rules_for_user,
pdu, &pdu,
db, db,
) )
.await .await
@ -510,7 +510,7 @@ impl Sending {
let permit = db.sending.maximum_requests.acquire().await; let permit = db.sending.maximum_requests.acquire().await;
info!("sending pdus to {}: {:#?}", server, pdu_jsons); error!("sending pdus to {}: {:#?}", server, pdu_jsons);
let response = server_server::send_request( let response = server_server::send_request(
&db.globals, &db.globals,
&*server, &*server,
@ -527,7 +527,7 @@ impl Sending {
) )
.await .await
.map(|response| { .map(|response| {
info!("server response: {:?}", response); error!("server response: {:?}", response);
kind.clone() kind.clone()
}) })
.map_err(|e| (kind, e)); .map_err(|e| (kind, e));

View File

@ -111,63 +111,3 @@ where
.respond_to(r) .respond_to(r)
} }
} }
pub struct ConduitLogger {
pub db: Database,
pub last_logs: RwLock<HashMap<String, Instant>>,
}
impl log::Log for ConduitLogger {
fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
true
}
fn log(&self, record: &log::Record<'_>) {
let output = format!("{} - {}", record.level(), record.args());
let match_mod_path =
|path: &str| path.starts_with("conduit::") || path.starts_with("state");
if self.enabled(record.metadata())
&& (record.module_path().map_or(false, match_mod_path)
|| record
.module_path()
.map_or(true, |path| !path.starts_with("rocket::")) // Rockets logs are annoying
&& record.metadata().level() <= log::Level::Warn)
{
let first_line = output
.lines()
.next()
.expect("lines always returns one item");
eprintln!("{}", output);
let mute_duration = match record.metadata().level() {
log::Level::Error => Duration::from_secs(60 * 5), // 5 minutes
log::Level::Warn => Duration::from_secs(60 * 60 * 24), // A day
_ => Duration::from_secs(60 * 60 * 24 * 7), // A week
};
if self
.last_logs
.read()
.unwrap()
.get(first_line)
.map_or(false, |i| i.elapsed() < mute_duration)
// Don't post this log again for some time
{
return;
}
if let Ok(mut_last_logs) = &mut self.last_logs.try_write() {
mut_last_logs.insert(first_line.to_owned(), Instant::now());
}
self.db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::notice_plain(output),
));
}
}
fn flush(&self) {}
}

View File

@ -1,7 +1,7 @@
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
use get_profile_information::v1::ProfileField; use get_profile_information::v1::ProfileField;
use http::header::{HeaderValue, AUTHORIZATION, HOST}; use http::header::{HeaderValue, AUTHORIZATION, HOST};
use log::{error, info, warn}; use log::{debug, error, info, warn};
use regex::Regex; use regex::Regex;
use rocket::{get, post, put, response::content::Json, State}; use rocket::{get, post, put, response::content::Json, State};
use ruma::{ use ruma::{
@ -27,7 +27,7 @@ use ruma::{
use state_res::{Event, EventMap, StateMap}; use state_res::{Event, EventMap, StateMap};
use std::{ use std::{
collections::{BTreeMap, BTreeSet, HashMap}, collections::{BTreeMap, BTreeSet, HashMap},
convert::TryFrom, convert::{TryFrom, TryInto},
fmt::Debug, fmt::Debug,
future::Future, future::Future,
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
@ -601,7 +601,7 @@ pub async fn send_transaction_message_route<'a>(
// discard the event whereas the Client Server API's /send/{eventType} endpoint // discard the event whereas the Client Server API's /send/{eventType} endpoint
// would return a M_BAD_JSON error. // would return a M_BAD_JSON error.
'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve { 'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve {
info!("Working on incoming pdu: {:?}", value); debug!("Working on incoming pdu: {:?}", value);
let server_name = &body.body.origin; let server_name = &body.body.origin;
let mut pub_key_map = BTreeMap::new(); let mut pub_key_map = BTreeMap::new();
@ -636,11 +636,11 @@ pub async fn send_transaction_message_route<'a>(
continue; continue;
} }
}; };
info!("Validated event."); debug!("Validated event.");
// 6. persist the event as an outlier. // 6. persist the event as an outlier.
db.rooms.add_pdu_outlier(&pdu)?; db.rooms.add_pdu_outlier(&pdu)?;
info!("Added pdu as outlier."); debug!("Added pdu as outlier.");
// Step 9. fetch missing state by calling /state_ids at backwards extremities doing all // Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
// the checks in this list starting at 1. These are not timeline events. // the checks in this list starting at 1. These are not timeline events.
@ -649,7 +649,7 @@ pub async fn send_transaction_message_route<'a>(
// //
// TODO: if we know the prev_events of the incoming event we can avoid the request and build // TODO: if we know the prev_events of the incoming event we can avoid the request and build
// the state from a known point and resolve if > 1 prev_event // the state from a known point and resolve if > 1 prev_event
info!("Requesting state at event."); debug!("Requesting state at event.");
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) = let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
match db match db
.sending .sending
@ -664,7 +664,7 @@ pub async fn send_transaction_message_route<'a>(
.await .await
{ {
Ok(res) => { Ok(res) => {
info!("Fetching state events at event."); debug!("Fetching state events at event.");
let state = match fetch_events( let state = match fetch_events(
&db, &db,
server_name, server_name,
@ -706,7 +706,7 @@ pub async fn send_transaction_message_route<'a>(
Err(_) => continue, Err(_) => continue,
}; };
info!("Fetching auth events of state events at event."); debug!("Fetching auth events of state events at event.");
(state, incoming_auth_events) (state, incoming_auth_events)
} }
Err(_) => { Err(_) => {
@ -735,7 +735,7 @@ pub async fn send_transaction_message_route<'a>(
); );
continue; continue;
} }
info!("Auth check succeeded."); debug!("Auth check succeeded.");
// End of step 10. // End of step 10.
// 12. check if the event passes auth based on the "current state" of the room, if not "soft fail" it // 12. check if the event passes auth based on the "current state" of the room, if not "soft fail" it
@ -746,8 +746,6 @@ pub async fn send_transaction_message_route<'a>(
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v))) .map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect(); .collect();
info!("current state: {:#?}", current_state);
if !state_res::event_auth::auth_check( if !state_res::event_auth::auth_check(
&RoomVersionId::Version6, &RoomVersionId::Version6,
&pdu, &pdu,
@ -764,7 +762,7 @@ pub async fn send_transaction_message_route<'a>(
); );
continue; continue;
}; };
info!("Auth check with current state succeeded."); debug!("Auth check with current state succeeded.");
// Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res // Step 11. Ensure that the state is derived from the previous current state (i.e. we calculated by doing state res
// where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote) // where one of the inputs was a previously trusted set of state, don't just trust a set of state we got from a remote)
@ -773,7 +771,7 @@ pub async fn send_transaction_message_route<'a>(
// it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree. // it also calculates the new pdu leaves for the `roomid_pduleaves` DB Tree.
let extremities = match calculate_forward_extremities(&db, &pdu).await { let extremities = match calculate_forward_extremities(&db, &pdu).await {
Ok(fork_ids) => { Ok(fork_ids) => {
info!("Calculated new forward extremities: {:?}", fork_ids); debug!("Calculated new forward extremities: {:?}", fork_ids);
fork_ids fork_ids
} }
Err(_) => { Err(_) => {
@ -828,20 +826,21 @@ pub async fn send_transaction_message_route<'a>(
for map in &fork_states { for map in &fork_states {
let mut state_auth = vec![]; let mut state_auth = vec![];
for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) { for auth_id in map.values().flat_map(|pdu| &pdu.auth_events) {
let event = match auth_cache.get(auth_id) { match fetch_events(
Some(aev) => aev.clone(), &db,
// The only events that haven't been added to the auth cache are server_name,
// events we have knowledge of previously &mut pub_key_map,
None => { &[auth_id.clone()],
error!("Event was not present in auth_cache {}", auth_id); &mut auth_cache,
resolved_map.insert( )
event_id.clone(), .await
Err("Event was not present in auth cache".into()), {
); // This should always contain exactly one element when Ok
continue 'main_pdu_loop; Ok(events) => state_auth.push(events[0].clone()),
Err(e) => {
debug!("Event was not present: {}", e);
} }
}; }
state_auth.push(event);
} }
auth_events.push(state_auth); auth_events.push(state_auth);
} }
@ -864,7 +863,7 @@ pub async fn send_transaction_message_route<'a>(
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)), .map(|(_, pdu)| (pdu.event_id().clone(), pdu)),
); );
info!("auth events: {:?}", auth_cache); debug!("auth events: {:?}", auth_cache);
let res = match state_res::StateResolution::resolve( let res = match state_res::StateResolution::resolve(
pdu.room_id(), pdu.room_id(),
@ -916,7 +915,7 @@ pub async fn send_transaction_message_route<'a>(
// We use the `state_at_event` instead of `state_after` so we accurately // We use the `state_at_event` instead of `state_after` so we accurately
// represent the state for this event. // represent the state for this event.
append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?; append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?;
info!("Appended incoming pdu."); debug!("Appended incoming pdu.");
// Set the new room state to the resolved state // Set the new room state to the resolved state
update_resolved_state( update_resolved_state(
@ -928,7 +927,7 @@ pub async fn send_transaction_message_route<'a>(
None None
}, },
)?; )?;
info!("Updated resolved state"); debug!("Updated resolved state");
// Event has passed all auth/stateres checks // Event has passed all auth/stateres checks
} }
@ -972,7 +971,7 @@ fn validate_event<'a>(
} }
.keys() .keys()
{ {
info!("Fetching signing keys for {}", signature_server); debug!("Fetching signing keys for {}", signature_server);
let keys = match fetch_signing_keys( let keys = match fetch_signing_keys(
&db, &db,
&Box::<ServerName>::try_from(&**signature_server).map_err(|_| { &Box::<ServerName>::try_from(&**signature_server).map_err(|_| {
@ -981,10 +980,7 @@ fn validate_event<'a>(
) )
.await .await
{ {
Ok(keys) => { Ok(keys) => keys,
info!("Keys: {:?}", keys);
keys
}
Err(_) => { Err(_) => {
return Err( return Err(
"Signature verification failed: Could not fetch signing key.".to_string(), "Signature verification failed: Could not fetch signing key.".to_string(),
@ -993,8 +989,6 @@ fn validate_event<'a>(
}; };
pub_key_map.insert(signature_server.clone(), keys); pub_key_map.insert(signature_server.clone(), keys);
info!("Fetched signing keys");
} }
let mut val = let mut val =
@ -1026,7 +1020,7 @@ fn validate_event<'a>(
) )
.map_err(|_| "Event is not a valid PDU".to_string())?; .map_err(|_| "Event is not a valid PDU".to_string())?;
info!("Fetching auth events."); debug!("Fetching auth events.");
fetch_check_auth_events(db, origin, pub_key_map, &pdu.auth_events, auth_cache) fetch_check_auth_events(db, origin, pub_key_map, &pdu.auth_events, auth_cache)
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
@ -1035,7 +1029,7 @@ fn validate_event<'a>(
/* /*
// 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events // 8. fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
info!("Fetching prev events."); debug!("Fetching prev events.");
let previous = fetch_events(&db, origin, pub_key_map, &pdu.prev_events, auth_cache) let previous = fetch_events(&db, origin, pub_key_map, &pdu.prev_events, auth_cache)
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
@ -1049,7 +1043,7 @@ fn validate_event<'a>(
}; };
// Check that the event passes auth based on the auth_events // Check that the event passes auth based on the auth_events
info!("Checking auth."); debug!("Checking auth.");
let is_authed = state_res::event_auth::auth_check( let is_authed = state_res::event_auth::auth_check(
&RoomVersionId::Version6, &RoomVersionId::Version6,
&pdu, &pdu,
@ -1073,7 +1067,7 @@ fn validate_event<'a>(
return Err("Event has failed auth check with auth events".to_string()); return Err("Event has failed auth check with auth events".to_string());
} }
info!("Validation successful."); debug!("Validation successful.");
Ok((pdu, previous_create)) Ok((pdu, previous_create))
}) })
} }
@ -1111,20 +1105,19 @@ pub(crate) async fn fetch_events(
) -> Result<Vec<Arc<PduEvent>>> { ) -> Result<Vec<Arc<PduEvent>>> {
let mut pdus = vec![]; let mut pdus = vec![];
for id in events { for id in events {
info!("Fetching event: {}", id);
let pdu = match auth_cache.get(id) { let pdu = match auth_cache.get(id) {
Some(pdu) => { Some(pdu) => {
info!("Event found in cache"); debug!("Event found in cache");
pdu.clone() pdu.clone()
} }
// `get_pdu` checks the outliers tree for us // `get_pdu` checks the outliers tree for us
None => match db.rooms.get_pdu(&id)? { None => match db.rooms.get_pdu(&id)? {
Some(pdu) => { Some(pdu) => {
info!("Event found in outliers"); debug!("Event found in outliers");
Arc::new(pdu) Arc::new(pdu)
} }
None => { None => {
info!("Fetching event over federation"); debug!("Fetching event over federation");
match db match db
.sending .sending
.send_federation_request( .send_federation_request(
@ -1135,7 +1128,7 @@ pub(crate) async fn fetch_events(
.await .await
{ {
Ok(res) => { Ok(res) => {
info!("Got event over federation: {:?}", res); debug!("Got event over federation: {:?}", res);
let (event_id, value) = let (event_id, value) =
crate::pdu::gen_event_id_canonical_json(&res.pdu); crate::pdu::gen_event_id_canonical_json(&res.pdu);
let (pdu, _) = let (pdu, _) =
@ -1146,7 +1139,7 @@ pub(crate) async fn fetch_events(
Error::Conflict("Authentication of event failed") Error::Conflict("Authentication of event failed")
})?; })?;
info!("Added fetched pdu as outlier."); debug!("Added fetched pdu as outlier.");
db.rooms.add_pdu_outlier(&pdu)?; db.rooms.add_pdu_outlier(&pdu)?;
pdu pdu
} }
@ -1171,15 +1164,11 @@ pub(crate) async fn fetch_signing_keys(
let mut result = BTreeMap::new(); let mut result = BTreeMap::new();
match db.globals.signing_keys_for(origin)? { match db.globals.signing_keys_for(origin)? {
keys if !keys.is_empty() => { keys if !keys.is_empty() => Ok(keys
info!("we knew the signing keys already: {:?}", keys); .into_iter()
Ok(keys .map(|(k, v)| (k.to_string(), v.key))
.into_iter() .collect()),
.map(|(k, v)| (k.to_string(), v.key))
.collect())
}
_ => { _ => {
info!("Asking {} for it's signing key", origin);
match db match db
.sending .sending
.send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new()) .send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
@ -1204,7 +1193,7 @@ pub(crate) async fn fetch_signing_keys(
} }
_ => { _ => {
for server in db.globals.trusted_servers() { for server in db.globals.trusted_servers() {
info!("Asking {} for {}'s signing key", server, origin); debug!("Asking {} for {}'s signing key", server, origin);
if let Ok(keys) = db if let Ok(keys) = db
.sending .sending
.send_federation_request( .send_federation_request(
@ -1219,7 +1208,7 @@ pub(crate) async fn fetch_signing_keys(
) )
.await .await
{ {
info!("Got signing keys: {:?}", keys); debug!("Got signing keys: {:?}", keys);
for k in keys.server_keys.into_iter() { for k in keys.server_keys.into_iter() {
db.globals.add_signing_key(origin, &k)?; db.globals.add_signing_key(origin, &k)?;
result.extend( result.extend(
@ -1364,7 +1353,6 @@ pub(crate) async fn build_forward_extremity_snapshots(
fork_states.insert(current_state); fork_states.insert(current_state);
} }
info!("Fork states: {:?}", fork_states);
Ok(fork_states) Ok(fork_states)
} }
@ -1548,7 +1536,10 @@ pub fn get_missing_events_route<'a>(
) )
.map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?, .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?,
); );
events.push(serde_json::from_value(pdu).expect("Raw<..> is always valid")); events.push(PduEvent::convert_to_outgoing_federation_event(
serde_json::from_value(pdu)
.map_err(|_| Error::bad_database("Invalid pdu in database."))?,
));
} }
i += 1; i += 1;
} }