diff --git a/Cargo.toml b/Cargo.toml index 13bcd9b..78496e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,3 +106,7 @@ conf-files = [ ] maintainer-scripts = "debian/" systemd-units = { unit-name = "matrix-conduit" } + +# For flamegraphs: +#[profile.release] +#debug = true diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index e31e582..63c103d 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -568,7 +568,13 @@ async fn join_room_by_id_helper( { let (event_id, value) = match result { Ok(t) => t, - Err(_) => continue, + Err(e) => { + warn!( + "PDU could not be verified: {:?} {:?} {:?}", + e, event_id, pdu + ); + continue; + } }; let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 658dfb8..0bc67d4 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -111,11 +111,7 @@ pub async fn create_room_route( } } - let power_levels_content = if let Some(power_levels) = &body.power_level_content_override { - serde_json::from_str(power_levels.json().get()).map_err(|_| { - Error::BadRequest(ErrorKind::BadJson, "Invalid power_level_content_override.") - })? - } else { + let mut power_levels_content = serde_json::to_value(ruma::events::room::power_levels::PowerLevelsEventContent { ban: 50.into(), events: BTreeMap::new(), @@ -130,8 +126,21 @@ pub async fn create_room_route( room: 50.into(), }, }) - .expect("event is valid, we just created it") - }; + .expect("event is valid, we just created it"); + + if let Some(power_level_content_override) = &body.power_level_content_override { + let json = serde_json::from_str::>( + power_level_content_override.json().get(), + ) + .map_err(|_| { + Error::BadRequest(ErrorKind::BadJson, "Invalid power_level_content_override.") + })?; + + for (key, value) in json { + power_levels_content[key] = value; + } + } + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomPowerLevels, diff --git a/src/database.rs b/src/database.rs index 6504f9c..62b3a40 100644 --- a/src/database.rs +++ b/src/database.rs @@ -198,7 +198,7 @@ impl Database { }, sending: sending::Sending { servernamepduids: db.open_tree("servernamepduids")?, - servercurrentpdus: db.open_tree("servercurrentpdus")?, + servercurrentevents: db.open_tree("servercurrentevents")?, maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), }, admin: admin::Admin { @@ -217,6 +217,9 @@ impl Database { _db: db, }; + // This data is probably outdated + db.rooms.edus.presenceid_presence.clear()?; + db.admin.start_handler(db.clone(), admin_receiver); Ok(db) diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index f69e897..56000e0 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -6,6 +6,7 @@ use ruma::{ }, presence::PresenceState, serde::Raw, + signatures::CanonicalJsonObject, RoomId, UInt, UserId, }; use std::{ @@ -88,9 +89,13 @@ impl RoomEdus { .filter_map(|r| r.ok()) .take_while(move |(k, _)| k.starts_with(&prefix)) .map(|(_, v)| { - Ok(serde_json::from_slice(&v).map_err(|_| { - Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid.") - })?) + let mut json = serde_json::from_slice::(&v).map_err(|_| { + Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") + })?; + json.remove("room_id"); + Ok(Raw::from_json( + serde_json::value::to_raw_value(&json).expect("json is valid raw value"), + )) })) } diff --git a/src/database/sending.rs b/src/database/sending.rs index 5495b36..e530396 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -28,11 +28,44 @@ pub enum OutgoingKind { Normal(Box), } +impl OutgoingKind { + pub fn get_prefix(&self) -> Vec { + let mut prefix = match self { + OutgoingKind::Appservice(server) => { + let mut p = b"+".to_vec(); + p.extend_from_slice(server.as_bytes()); + p + } + OutgoingKind::Push(user, pushkey) => { + let mut p = b"$".to_vec(); + p.extend_from_slice(&user); + p.push(0xff); + p.extend_from_slice(&pushkey); + p + } + OutgoingKind::Normal(server) => { + let mut p = Vec::new(); + p.extend_from_slice(server.as_bytes()); + p + } + }; + prefix.push(0xff); + + prefix + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum SendingEventType { + Pdu(Vec), + Edu(Vec), +} + #[derive(Clone)] pub struct Sending { /// The state for a given state hash. pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId - pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId + pub(super) servercurrentevents: sled::Tree, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / (*)EduEvent pub(super) maximum_requests: Arc, } @@ -45,7 +78,7 @@ enum TransactionStatus { impl Sending { pub fn start_handler(&self, db: &Database) { let servernamepduids = self.servernamepduids.clone(); - let servercurrentpdus = self.servercurrentpdus.clone(); + let servercurrentevents = self.servercurrentevents.clone(); let db = db.clone(); @@ -56,14 +89,14 @@ impl Sending { let mut subscriber = servernamepduids.watch_prefix(b""); let mut current_transaction_status = HashMap::, TransactionStatus>::new(); - let mut initial_transactions = HashMap::>>::new(); - for (key, outgoing_kind, pdu) in servercurrentpdus + let mut initial_transactions = HashMap::>::new(); + for (key, outgoing_kind, event) in servercurrentevents .iter() .filter_map(|r| r.ok()) .filter_map(|(key, _)| { - Self::parse_servercurrentpdus(&key) + Self::parse_servercurrentevent(&key) .ok() - .map(|(k, p)| (key, k, p.to_vec())) + .map(|(k, e)| (key, k, e)) }) { let entry = initial_transactions @@ -72,39 +105,20 @@ impl Sending { if entry.len() > 30 { warn!( - "Dropping some current pdu: {:?} {:?} {:?}", - key, outgoing_kind, pdu + "Dropping some current events: {:?} {:?} {:?}", + key, outgoing_kind, event ); - servercurrentpdus.remove(key).unwrap(); + servercurrentevents.remove(key).unwrap(); continue; } - entry.push(pdu); + entry.push(event); } - for (outgoing_kind, pdus) in initial_transactions { - let mut prefix = match &outgoing_kind { - OutgoingKind::Appservice(server) => { - let mut p = b"+".to_vec(); - p.extend_from_slice(server.as_bytes()); - p - } - OutgoingKind::Push(user, pushkey) => { - let mut p = b"$".to_vec(); - p.extend_from_slice(&user); - p.push(0xff); - p.extend_from_slice(&pushkey); - p - } - OutgoingKind::Normal(server) => { - let mut p = Vec::new(); - p.extend_from_slice(server.as_bytes()); - p - } - }; - prefix.push(0xff); - current_transaction_status.insert(prefix, TransactionStatus::Running); - futures.push(Self::handle_event(outgoing_kind.clone(), pdus, &db)); + for (outgoing_kind, events) in initial_transactions { + current_transaction_status + .insert(outgoing_kind.get_prefix(), TransactionStatus::Running); + futures.push(Self::handle_events(outgoing_kind.clone(), events, &db)); } loop { @@ -112,58 +126,46 @@ impl Sending { Some(response) = futures.next() => { match response { Ok(outgoing_kind) => { - let mut prefix = match &outgoing_kind { - OutgoingKind::Appservice(server) => { - let mut p = b"+".to_vec(); - p.extend_from_slice(server.as_bytes()); - p - } - OutgoingKind::Push(user, pushkey) => { - let mut p = b"$".to_vec(); - p.extend_from_slice(&user); - p.push(0xff); - p.extend_from_slice(&pushkey); - p - }, - OutgoingKind::Normal(server) => { - let mut p = vec![]; - p.extend_from_slice(server.as_bytes()); - p - }, - }; - prefix.push(0xff); - - for key in servercurrentpdus + let prefix = outgoing_kind.get_prefix(); + for key in servercurrentevents .scan_prefix(&prefix) .keys() .filter_map(|r| r.ok()) { - servercurrentpdus.remove(key).unwrap(); + servercurrentevents.remove(key).unwrap(); } // Find events that have been added since starting the last request - let new_pdus = servernamepduids + let new_events = servernamepduids .scan_prefix(&prefix) .keys() .filter_map(|r| r.ok()) .map(|k| { - k[prefix.len()..].to_vec() + SendingEventType::Pdu(k[prefix.len()..].to_vec()) }) .take(30) .collect::>(); - if !new_pdus.is_empty() { - for pdu_id in &new_pdus { + // TODO: find edus + + if !new_events.is_empty() { + // Insert pdus we found + for event in &new_events { let mut current_key = prefix.clone(); - current_key.extend_from_slice(pdu_id); - servercurrentpdus.insert(¤t_key, &[]).unwrap(); - servernamepduids.remove(¤t_key).unwrap(); + match event { + SendingEventType::Pdu(b) | + SendingEventType::Edu(b) => { + current_key.extend_from_slice(&b); + servercurrentevents.insert(¤t_key, &[]).unwrap(); + servernamepduids.remove(¤t_key).unwrap(); + } + } } futures.push( - Self::handle_event( + Self::handle_events( outgoing_kind.clone(), - new_pdus, + new_events, &db, ) ); @@ -172,29 +174,7 @@ impl Sending { } } Err((outgoing_kind, _)) => { - let mut prefix = match &outgoing_kind { - OutgoingKind::Appservice(serv) => { - let mut p = b"+".to_vec(); - p.extend_from_slice(serv.as_bytes()); - p - }, - OutgoingKind::Push(user, pushkey) => { - let mut p = b"$".to_vec(); - p.extend_from_slice(&user); - p.push(0xff); - p.extend_from_slice(&pushkey); - p - }, - OutgoingKind::Normal(serv) => { - let mut p = vec![]; - p.extend_from_slice(serv.as_bytes()); - p - }, - }; - - prefix.push(0xff); - - current_transaction_status.entry(prefix).and_modify(|e| *e = match e { + current_transaction_status.entry(outgoing_kind.get_prefix()).and_modify(|e| *e = match e { TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), TransactionStatus::Failed(_, _) => { @@ -206,97 +186,17 @@ impl Sending { }; }, Some(event) = &mut subscriber => { - if let sled::Event::Insert { key, .. } = event { // New sled version: //for (_tree, key, value_opt) in &event { // if value_opt.is_none() { // continue; // } - let servernamepduid = key.clone(); - - let mut retry = false; - - if let Some((outgoing_kind, prefix, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) - .ok() - .map(|(outgoing_kind, pdu_id)| { - let mut prefix = match &outgoing_kind { - OutgoingKind::Appservice(serv) => { - let mut p = b"+".to_vec(); - p.extend_from_slice(serv.as_bytes()); - p - }, - OutgoingKind::Push(user, pushkey) => { - let mut p = b"$".to_vec(); - p.extend_from_slice(&user); - p.push(0xff); - p.extend_from_slice(&pushkey); - p - }, - OutgoingKind::Normal(serv) => { - let mut p = vec![]; - p.extend_from_slice(serv.as_bytes()); - p - }, - }; - prefix.push(0xff); - - (outgoing_kind, prefix, pdu_id) - }) - .filter(|(_, prefix, _)| { - let entry = current_transaction_status.entry(prefix.clone()); - let mut allow = true; - - entry.and_modify(|e| match e { - TransactionStatus::Running | TransactionStatus::Retrying(_) => { - allow = false; // already running - }, - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential backoff) - let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60*60*24) { - min_elapsed_duration = Duration::from_secs(60*60*24); - } - - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); - } - } - }).or_insert(TransactionStatus::Running); - - allow - }) - { - let mut pdus = Vec::new(); - - if retry { - // We retry the previous transaction - for pdu in servercurrentpdus - .scan_prefix(&prefix) - .filter_map(|r| r.ok()) - .filter_map(|(key, _)| { - Self::parse_servercurrentpdus(&key) - .ok() - .map(|(_, p)| p.to_vec()) - }) - { - pdus.push(pdu); - } - } else { - servercurrentpdus.insert(&key, &[]).unwrap(); - servernamepduids.remove(&key).unwrap(); - pdus.push(pdu_id.to_vec()); + if let sled::Event::Insert { key, .. } = event { + if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key) { + if let Some(events) = Self::select_events(&outgoing_kind, vec![(event, key)], &mut current_transaction_status, &servercurrentevents, &servernamepduids) { + futures.push(Self::handle_events(outgoing_kind, events, &db)); } - futures.push( - Self::handle_event( - outgoing_kind, - pdus, - &db, - ) - ); } } } @@ -305,6 +205,73 @@ impl Sending { }); } + fn select_events( + outgoing_kind: &OutgoingKind, + new_events: Vec<(SendingEventType, IVec)>, // Events we want to send: event and full key + current_transaction_status: &mut HashMap, TransactionStatus>, + servercurrentevents: &sled::Tree, + servernamepduids: &sled::Tree, + ) -> Option> { + let mut retry = false; + let mut allow = true; + + let prefix = outgoing_kind.get_prefix(); + let entry = current_transaction_status.entry(prefix.clone()); + + entry + .and_modify(|e| match e { + TransactionStatus::Running | TransactionStatus::Retrying(_) => { + allow = false; // already running + } + TransactionStatus::Failed(tries, time) => { + // Fail if a request has failed recently (exponential backoff) + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + allow = false; + } else { + retry = true; + *e = TransactionStatus::Retrying(*tries); + } + } + }) + .or_insert(TransactionStatus::Running); + + if !allow { + return None; + } + + let mut events = Vec::new(); + + if retry { + // We retry the previous transaction + for key in servercurrentevents + .scan_prefix(&prefix) + .keys() + .filter_map(|r| r.ok()) + { + if let Ok((_, e)) = Self::parse_servercurrentevent(&key) { + events.push(e); + } + } + } else { + for (e, full_key) in new_events { + servercurrentevents.insert(&full_key, &[]).unwrap(); + + // If it was a PDU we have to unqueue it + // TODO: don't try to unqueue EDUs + servernamepduids.remove(&full_key).unwrap(); + + events.push(e); + } + } + + Some(events) + } + #[tracing::instrument(skip(self))] pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: IVec) -> Result<()> { let mut key = b"$".to_vec(); @@ -338,7 +305,7 @@ impl Sending { } #[tracing::instrument] - fn calculate_hash(keys: &[Vec]) -> Vec { + fn calculate_hash(keys: &[&[u8]]) -> Vec { // We only hash the pdu's event ids, not the whole pdu let bytes = keys.join(&0xff); let hash = digest::digest(&digest::SHA256, &bytes); @@ -346,33 +313,37 @@ impl Sending { } #[tracing::instrument(skip(db))] - async fn handle_event( + async fn handle_events( kind: OutgoingKind, - pdu_ids: Vec>, + events: Vec, db: &Database, ) -> std::result::Result { match &kind { OutgoingKind::Appservice(server) => { - let pdu_jsons = pdu_ids - .iter() - .map(|pdu_id| { - Ok::<_, (Box, Error)>( - db.rooms - .get_pdu_from_id(pdu_id) - .map_err(|e| (server.clone(), e))? + let mut pdu_jsons = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdu_jsons.push(db.rooms + .get_pdu_from_id(&pdu_id) + .map_err(|e| (kind.clone(), e))? .ok_or_else(|| { ( - server.clone(), + kind.clone(), Error::bad_database( "[Appservice] Event in servernamepduids not found in db.", ), ) })? - .to_any_event(), - ) - }) - .filter_map(|r| r.ok()) - .collect::>(); + .to_any_event()) + } + SendingEventType::Edu(_) => { + // Appservices don't need EDUs (?) + } + } + } + let permit = db.sending.maximum_requests.acquire().await; let response = appservice_server::send_request( @@ -384,7 +355,14 @@ impl Sending { appservice::event::push_events::v1::Request { events: &pdu_jsons, txn_id: &base64::encode_config( - Self::calculate_hash(&pdu_ids), + Self::calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, + }) + .collect::>(), + ), base64::URL_SAFE_NO_PAD, ), }, @@ -398,25 +376,30 @@ impl Sending { response } OutgoingKind::Push(user, pushkey) => { - let pdus = pdu_ids - .iter() - .map(|pdu_id| { - Ok::<_, (Vec, Error)>( - db.rooms - .get_pdu_from_id(pdu_id) - .map_err(|e| (pushkey.clone(), e))? - .ok_or_else(|| { - ( - pushkey.clone(), - Error::bad_database( - "[Push] Event in servernamepduids not found in db.", - ), - ) - })?, - ) - }) - .filter_map(|r| r.ok()) - .collect::>(); + let mut pdus = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { + pdus.push( + db.rooms + .get_pdu_from_id(&pdu_id) + .map_err(|e| (kind.clone(), e))? + .ok_or_else(|| { + ( + kind.clone(), + Error::bad_database( + "[Push] Event in servernamepduids not found in db.", + ), + ) + })?, + ); + } + SendingEventType::Edu(_) => { + // Push gateways don't need EDUs (?) + } + } + } for pdu in pdus { // Redacted events are not notification targets (we don't send push for them) @@ -427,13 +410,13 @@ impl Sending { let userid = UserId::try_from(utils::string_from_bytes(user).map_err(|_| { ( - OutgoingKind::Push(user.clone(), pushkey.clone()), + kind.clone(), Error::bad_database("Invalid push user string in db."), ) })?) .map_err(|_| { ( - OutgoingKind::Push(user.clone(), pushkey.clone()), + kind.clone(), Error::bad_database("Invalid push user id in db."), ) })?; @@ -484,15 +467,17 @@ impl Sending { Ok(OutgoingKind::Push(user.clone(), pushkey.clone())) } OutgoingKind::Normal(server) => { - let pdu_jsons = pdu_ids - .iter() - .map(|pdu_id| { - Ok::<_, (OutgoingKind, Error)>( + let mut edu_jsons = Vec::new(); + let mut pdu_jsons = Vec::new(); + + for event in &events { + match event { + SendingEventType::Pdu(pdu_id) => { // TODO: check room version and remove event_id if needed - serde_json::from_str( + pdu_jsons.push(serde_json::from_str( PduEvent::convert_to_outgoing_federation_event( db.rooms - .get_pdu_json_from_id(pdu_id) + .get_pdu_json_from_id(&pdu_id) .map_err(|e| (OutgoingKind::Normal(server.clone()), e))? .ok_or_else(|| { ( @@ -506,11 +491,15 @@ impl Sending { .json() .get(), ) - .expect("Raw<..> is always valid"), - ) - }) - .filter_map(|r| r.ok()) - .collect::>(); + .expect("Raw<..> is always valid")); + } + SendingEventType::Edu(edu) => { + edu_jsons.push( + serde_json::from_slice(edu).expect("Raw<..> is always valid"), + ); + } + } + } let permit = db.sending.maximum_requests.acquire().await; @@ -520,10 +509,17 @@ impl Sending { send_transaction_message::v1::Request { origin: db.globals.server_name(), pdus: &pdu_jsons, - edus: &[], + edus: &edu_jsons, origin_server_ts: SystemTime::now(), transaction_id: &base64::encode_config( - Self::calculate_hash(&pdu_ids), + Self::calculate_hash( + &events + .iter() + .map(|e| match e { + SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, + }) + .collect::>(), + ), base64::URL_SAFE_NO_PAD, ), }, @@ -546,13 +542,13 @@ impl Sending { } } - fn parse_servercurrentpdus(key: &IVec) -> Result<(OutgoingKind, IVec)> { + fn parse_servercurrentevent(key: &IVec) -> Result<(OutgoingKind, SendingEventType)> { // Appservices start with a plus Ok::<_, Error>(if key.starts_with(b"+") { let mut parts = key[1..].splitn(2, |&b| b == 0xff); let server = parts.next().expect("splitn always returns one element"); - let pdu = parts + let event = parts .next() .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; let server = utils::string_from_bytes(&server).map_err(|_| { @@ -563,7 +559,11 @@ impl Sending { OutgoingKind::Appservice(Box::::try_from(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") })?), - IVec::from(pdu), + if event.starts_with(b"*") { + SendingEventType::Edu(event[1..].to_vec()) + } else { + SendingEventType::Pdu(event.to_vec()) + }, ) } else if key.starts_with(b"$") { let mut parts = key[1..].splitn(3, |&b| b == 0xff); @@ -572,18 +572,22 @@ impl Sending { let pushkey = parts .next() .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; - let pdu = parts + let event = parts .next() .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; ( OutgoingKind::Push(user.to_vec(), pushkey.to_vec()), - IVec::from(pdu), + if event.starts_with(b"*") { + SendingEventType::Edu(event[1..].to_vec()) + } else { + SendingEventType::Pdu(event.to_vec()) + }, ) } else { let mut parts = key.splitn(2, |&b| b == 0xff); let server = parts.next().expect("splitn always returns one element"); - let pdu = parts + let event = parts .next() .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; let server = utils::string_from_bytes(&server).map_err(|_| { @@ -594,7 +598,11 @@ impl Sending { OutgoingKind::Normal(Box::::try_from(server).map_err(|_| { Error::bad_database("Invalid server string in server_currenttransaction") })?), - IVec::from(pdu), + if event.starts_with(b"*") { + SendingEventType::Edu(event[1..].to_vec()) + } else { + SendingEventType::Pdu(event.to_vec()) + }, ) }) } diff --git a/src/error.rs b/src/error.rs index 65c5b4f..6c37bed 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -use log::error; +use log::{error, warn}; use ruma::api::client::{error::ErrorKind, r0::uiaa::UiaaInfo}; use thiserror::Error; @@ -92,6 +92,8 @@ where _ => (Unknown, StatusCode::INTERNAL_SERVER_ERROR), }; + warn!("{}: {}", status_code, message); + RumaResponse::from(RumaError { kind, message, diff --git a/src/main.rs b/src/main.rs index 1c058b0..87928cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -205,13 +205,16 @@ async fn main() { let root = span!(tracing::Level::INFO, "app_start", work_units = 2); let _enter = root.enter(); + + let rocket = setup_rocket(raw_config, db); + rocket.launch().await.unwrap(); } else { std::env::set_var("CONDUIT_LOG", config.log); pretty_env_logger::init_custom_env("CONDUIT_LOG"); - } - let rocket = setup_rocket(raw_config, db); - rocket.launch().await.unwrap(); + let rocket = setup_rocket(raw_config, db); + rocket.launch().await.unwrap(); + } } #[catch(404)] diff --git a/src/server_server.rs b/src/server_server.rs index fa460bf..77eb448 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -27,11 +27,12 @@ use ruma::{ }, directory::{IncomingFilter, IncomingRoomNetwork}, events::{ + receipt::{ReceiptEvent, ReceiptEventContent}, room::{ create::CreateEventContent, member::{MemberEventContent, MembershipState}, }, - EventType, + AnyEphemeralRoomEvent, AnyEvent as EduEvent, EventType, }, serde::Raw, signatures::{CanonicalJsonObject, CanonicalJsonValue}, @@ -585,35 +586,6 @@ pub async fn send_transaction_message_route<'a>( return Err(Error::bad_config("Federation is disabled.")); } - for edu in body - .edus - .iter() - .map(|edu| serde_json::from_str::(edu.json().get())) - .filter_map(|r| r.ok()) - { - match edu { - Edu::Presence(_) => {} - Edu::Receipt(_) => {} - Edu::Typing(typing) => { - if typing.typing { - db.rooms.edus.typing_add( - &typing.user_id, - &typing.room_id, - 3000 + utils::millis_since_unix_epoch(), - &db.globals, - )?; - } else { - db.rooms - .edus - .typing_remove(&typing.user_id, &typing.room_id, &db.globals)?; - } - } - Edu::DeviceListUpdate(_) => {} - Edu::DirectToDevice(_) => {} - Edu::_Custom(_) => {} - } - } - let mut resolved_map = BTreeMap::new(); let pub_key_map = RwLock::new(BTreeMap::new()); @@ -659,6 +631,73 @@ pub async fn send_transaction_message_route<'a>( } } + for edu in body + .edus + .iter() + .map(|edu| serde_json::from_str::(edu.json().get())) + .filter_map(|r| r.ok()) + { + match edu { + Edu::Presence(_) => {} + Edu::Receipt(receipt) => { + for (room_id, room_updates) in receipt.receipts { + for (user_id, user_updates) in room_updates.read { + if let Some((event_id, _)) = user_updates + .event_ids + .iter() + .filter_map(|id| { + db.rooms.get_pdu_count(&id).ok().flatten().map(|r| (id, r)) + }) + .max_by_key(|(_, count)| *count) + { + let mut user_receipts = BTreeMap::new(); + user_receipts.insert(user_id.clone(), user_updates.data); + + let mut receipt_content = BTreeMap::new(); + receipt_content.insert( + event_id.to_owned(), + ruma::events::receipt::Receipts { + read: Some(user_receipts), + }, + ); + + let event = + EduEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt(ReceiptEvent { + content: ReceiptEventContent(receipt_content), + room_id: room_id.clone(), + })); + db.rooms.edus.readreceipt_update( + &user_id, + &room_id, + event, + &db.globals, + )?; + } else { + warn!("No known event ids in read receipt: {:?}", user_updates); + } + } + } + } + Edu::Typing(typing) => { + if typing.typing { + db.rooms.edus.typing_add( + &typing.user_id, + &typing.room_id, + 3000 + utils::millis_since_unix_epoch(), + &db.globals, + )?; + } else { + db.rooms + .edus + .typing_remove(&typing.user_id, &typing.room_id, &db.globals)?; + } + } + Edu::DeviceListUpdate(_) => {} + Edu::DirectToDevice(_) => {} + Edu::_Custom(_) => {} + } + } + Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) } @@ -1134,7 +1173,7 @@ pub fn handle_incoming_pdu<'a>( .await { // This should always contain exactly one element when Ok - Ok(events) => state_auth.push(events[0].clone()), + Ok(events) => state_auth.extend_from_slice(&events), Err(e) => { debug!("Event was not present: {}", e); }