diff --git a/src/database/sending.rs b/src/database/sending.rs index ffd3ed6..5495b36 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -10,7 +10,7 @@ use crate::{ appservice_server, database::pusher, server_server, utils, Database, Error, PduEvent, Result, }; use federation::transactions::send_transaction_message; -use log::warn; +use log::{error, warn}; use ring::digest; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ @@ -32,10 +32,16 @@ pub enum OutgoingKind { pub struct Sending { /// The state for a given state hash. pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId - pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation) + pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId pub(super) maximum_requests: Arc, } +enum TransactionStatus { + Running, + Failed(u32, Instant), // number of times failed, time of last failure + Retrying(u32), // number of times failed +} + impl Sending { pub fn start_handler(&self, db: &Database) { let servernamepduids = self.servernamepduids.clone(); @@ -47,8 +53,10 @@ impl Sending { let mut futures = FuturesUnordered::new(); // Retry requests we could not finish yet - let mut current_transactions = HashMap::>>::new(); + let mut subscriber = servernamepduids.watch_prefix(b""); + let mut current_transaction_status = HashMap::, TransactionStatus>::new(); + let mut initial_transactions = HashMap::>>::new(); for (key, outgoing_kind, pdu) in servercurrentpdus .iter() .filter_map(|r| r.ok()) @@ -58,18 +66,15 @@ impl Sending { .map(|(k, p)| (key, k, p.to_vec())) }) { - if pdu.is_empty() { - // Remove old reservation key - servercurrentpdus.remove(key).unwrap(); - continue; - } - - let entry = current_transactions - .entry(outgoing_kind) + let entry = initial_transactions + .entry(outgoing_kind.clone()) .or_insert_with(Vec::new); if entry.len() > 30 { - warn!("Dropping some current pdus because too many were queued. This should not happen."); + warn!( + "Dropping some current pdu: {:?} {:?} {:?}", + key, outgoing_kind, pdu + ); servercurrentpdus.remove(key).unwrap(); continue; } @@ -77,8 +82,7 @@ impl Sending { entry.push(pdu); } - for (outgoing_kind, pdus) in current_transactions { - // Create new reservation + for (outgoing_kind, pdus) in initial_transactions { let mut prefix = match &outgoing_kind { OutgoingKind::Appservice(server) => { let mut p = b"+".to_vec(); @@ -99,14 +103,10 @@ impl Sending { } }; prefix.push(0xff); - servercurrentpdus.insert(prefix, &[]).unwrap(); - + current_transaction_status.insert(prefix, TransactionStatus::Running); futures.push(Self::handle_event(outgoing_kind.clone(), pdus, &db)); } - let mut last_failed_try: HashMap = HashMap::new(); - - let mut subscriber = servernamepduids.watch_prefix(b""); loop { select! { Some(response) = futures.next() => { @@ -138,10 +138,7 @@ impl Sending { .keys() .filter_map(|r| r.ok()) { - // Don't remove reservation yet - if prefix.len() != key.len() { - servercurrentpdus.remove(key).unwrap(); - } + servercurrentpdus.remove(key).unwrap(); } // Find events that have been added since starting the last request @@ -171,8 +168,7 @@ impl Sending { ) ); } else { - servercurrentpdus.remove(&prefix).unwrap(); - // servercurrentpdus with the prefix should be empty now + current_transaction_status.remove(&prefix); } } Err((outgoing_kind, _)) => { @@ -198,15 +194,14 @@ impl Sending { prefix.push(0xff); - last_failed_try.insert(outgoing_kind.clone(), match last_failed_try.get(&outgoing_kind) { - Some(last_failed) => { - (last_failed.0+1, Instant::now()) + current_transaction_status.entry(prefix).and_modify(|e| *e = match e { + TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), + TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), + TransactionStatus::Failed(_, _) => { + error!("Request that was not even running failed?!"); + return }, - None => { - (1, Instant::now()) - } }); - servercurrentpdus.remove(&prefix).unwrap(); } }; }, @@ -220,24 +215,12 @@ impl Sending { let servernamepduid = key.clone(); - let exponential_backoff = |(tries, instant): &(u32, Instant)| { - // Fail if a request has failed recently (exponential backoff) - let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); - if min_elapsed_duration > Duration::from_secs(60*60*24) { - min_elapsed_duration = Duration::from_secs(60*60*24); - } + let mut retry = false; - instant.elapsed() < min_elapsed_duration - }; - - if let Some((outgoing_kind, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) + if let Some((outgoing_kind, prefix, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) .ok() - .filter(|(outgoing_kind, _)| { - if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) { - return false; - } - - let mut prefix = match outgoing_kind { + .map(|(outgoing_kind, pdu_id)| { + let mut prefix = match &outgoing_kind { OutgoingKind::Appservice(serv) => { let mut p = b"+".to_vec(); p.extend_from_slice(serv.as_bytes()); @@ -258,20 +241,59 @@ impl Sending { }; prefix.push(0xff); - servercurrentpdus - .compare_and_swap(prefix, Option::<&[u8]>::None, Some(&[])) // Try to reserve - == Ok(Ok(())) + (outgoing_kind, prefix, pdu_id) + }) + .filter(|(_, prefix, _)| { + let entry = current_transaction_status.entry(prefix.clone()); + let mut allow = true; + + entry.and_modify(|e| match e { + TransactionStatus::Running | TransactionStatus::Retrying(_) => { + allow = false; // already running + }, + TransactionStatus::Failed(tries, time) => { + // Fail if a request has failed recently (exponential backoff) + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60*60*24) { + min_elapsed_duration = Duration::from_secs(60*60*24); + } + + if time.elapsed() < min_elapsed_duration { + allow = false; + } else { + retry = true; + *e = TransactionStatus::Retrying(*tries); + } + } + }).or_insert(TransactionStatus::Running); + + allow }) { - servercurrentpdus.insert(&key, &[]).unwrap(); - servernamepduids.remove(&key).unwrap(); - - last_failed_try.remove(&outgoing_kind); + let mut pdus = Vec::new(); + if retry { + // We retry the previous transaction + for pdu in servercurrentpdus + .scan_prefix(&prefix) + .filter_map(|r| r.ok()) + .filter_map(|(key, _)| { + Self::parse_servercurrentpdus(&key) + .ok() + .map(|(_, p)| p.to_vec()) + }) + { + pdus.push(pdu); + } + } else { + servercurrentpdus.insert(&key, &[]).unwrap(); + servernamepduids.remove(&key).unwrap(); + pdus.push(pdu_id.to_vec()); + } futures.push( Self::handle_event( outgoing_kind, - vec![pdu_id.to_vec()], + pdus, &db, ) ); @@ -342,7 +364,7 @@ impl Sending { ( server.clone(), Error::bad_database( - "[Appservice] Event in servernamepduids not found in ", + "[Appservice] Event in servernamepduids not found in db.", ), ) })?