Merge branch 'sending' into 'master'
fix: send transaction retry code See merge request famedly/conduit!68
This commit is contained in:
		
						commit
						abe176e3d0
					
				
					 1 changed files with 79 additions and 57 deletions
				
			
		|  | @ -10,7 +10,7 @@ use crate::{ | |||
|     appservice_server, database::pusher, server_server, utils, Database, Error, PduEvent, Result, | ||||
| }; | ||||
| use federation::transactions::send_transaction_message; | ||||
| use log::warn; | ||||
| use log::{error, warn}; | ||||
| use ring::digest; | ||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||
| use ruma::{ | ||||
|  | @ -32,10 +32,16 @@ pub enum OutgoingKind { | |||
| pub struct Sending { | ||||
|     /// The state for a given state hash.
 | ||||
|     pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
 | ||||
|     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation)
 | ||||
|     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId
 | ||||
|     pub(super) maximum_requests: Arc<Semaphore>, | ||||
| } | ||||
| 
 | ||||
| enum TransactionStatus { | ||||
|     Running, | ||||
|     Failed(u32, Instant), // number of times failed, time of last failure
 | ||||
|     Retrying(u32),        // number of times failed
 | ||||
| } | ||||
| 
 | ||||
| impl Sending { | ||||
|     pub fn start_handler(&self, db: &Database) { | ||||
|         let servernamepduids = self.servernamepduids.clone(); | ||||
|  | @ -47,8 +53,10 @@ impl Sending { | |||
|             let mut futures = FuturesUnordered::new(); | ||||
| 
 | ||||
|             // Retry requests we could not finish yet
 | ||||
|             let mut current_transactions = HashMap::<OutgoingKind, Vec<Vec<u8>>>::new(); | ||||
|             let mut subscriber = servernamepduids.watch_prefix(b""); | ||||
|             let mut current_transaction_status = HashMap::<Vec<u8>, TransactionStatus>::new(); | ||||
| 
 | ||||
|             let mut initial_transactions = HashMap::<OutgoingKind, Vec<Vec<u8>>>::new(); | ||||
|             for (key, outgoing_kind, pdu) in servercurrentpdus | ||||
|                 .iter() | ||||
|                 .filter_map(|r| r.ok()) | ||||
|  | @ -58,18 +66,15 @@ impl Sending { | |||
|                         .map(|(k, p)| (key, k, p.to_vec())) | ||||
|                 }) | ||||
|             { | ||||
|                 if pdu.is_empty() { | ||||
|                     // Remove old reservation key
 | ||||
|                     servercurrentpdus.remove(key).unwrap(); | ||||
|                     continue; | ||||
|                 } | ||||
| 
 | ||||
|                 let entry = current_transactions | ||||
|                     .entry(outgoing_kind) | ||||
|                 let entry = initial_transactions | ||||
|                     .entry(outgoing_kind.clone()) | ||||
|                     .or_insert_with(Vec::new); | ||||
| 
 | ||||
|                 if entry.len() > 30 { | ||||
|                     warn!("Dropping some current pdus because too many were queued. This should not happen."); | ||||
|                     warn!( | ||||
|                         "Dropping some current pdu: {:?} {:?} {:?}", | ||||
|                         key, outgoing_kind, pdu | ||||
|                     ); | ||||
|                     servercurrentpdus.remove(key).unwrap(); | ||||
|                     continue; | ||||
|                 } | ||||
|  | @ -77,8 +82,7 @@ impl Sending { | |||
|                 entry.push(pdu); | ||||
|             } | ||||
| 
 | ||||
|             for (outgoing_kind, pdus) in current_transactions { | ||||
|                 // Create new reservation
 | ||||
|             for (outgoing_kind, pdus) in initial_transactions { | ||||
|                 let mut prefix = match &outgoing_kind { | ||||
|                     OutgoingKind::Appservice(server) => { | ||||
|                         let mut p = b"+".to_vec(); | ||||
|  | @ -99,14 +103,10 @@ impl Sending { | |||
|                     } | ||||
|                 }; | ||||
|                 prefix.push(0xff); | ||||
|                 servercurrentpdus.insert(prefix, &[]).unwrap(); | ||||
| 
 | ||||
|                 current_transaction_status.insert(prefix, TransactionStatus::Running); | ||||
|                 futures.push(Self::handle_event(outgoing_kind.clone(), pdus, &db)); | ||||
|             } | ||||
| 
 | ||||
|             let mut last_failed_try: HashMap<OutgoingKind, (u32, Instant)> = HashMap::new(); | ||||
| 
 | ||||
|             let mut subscriber = servernamepduids.watch_prefix(b""); | ||||
|             loop { | ||||
|                 select! { | ||||
|                     Some(response) = futures.next() => { | ||||
|  | @ -138,10 +138,7 @@ impl Sending { | |||
|                                     .keys() | ||||
|                                     .filter_map(|r| r.ok()) | ||||
|                                 { | ||||
|                                     // Don't remove reservation yet
 | ||||
|                                     if prefix.len() != key.len() { | ||||
|                                         servercurrentpdus.remove(key).unwrap(); | ||||
|                                     } | ||||
|                                     servercurrentpdus.remove(key).unwrap(); | ||||
|                                 } | ||||
| 
 | ||||
|                                 // Find events that have been added since starting the last request
 | ||||
|  | @ -171,8 +168,7 @@ impl Sending { | |||
|                                         ) | ||||
|                                     ); | ||||
|                                 } else { | ||||
|                                     servercurrentpdus.remove(&prefix).unwrap(); | ||||
|                                     // servercurrentpdus with the prefix should be empty now
 | ||||
|                                     current_transaction_status.remove(&prefix); | ||||
|                                 } | ||||
|                             } | ||||
|                             Err((outgoing_kind, _)) => { | ||||
|  | @ -198,15 +194,14 @@ impl Sending { | |||
| 
 | ||||
|                                 prefix.push(0xff); | ||||
| 
 | ||||
|                                 last_failed_try.insert(outgoing_kind.clone(), match last_failed_try.get(&outgoing_kind) { | ||||
|                                     Some(last_failed) => { | ||||
|                                         (last_failed.0+1, Instant::now()) | ||||
|                                 current_transaction_status.entry(prefix).and_modify(|e| *e = match e { | ||||
|                                     TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), | ||||
|                                     TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), | ||||
|                                     TransactionStatus::Failed(_, _) => { | ||||
|                                         error!("Request that was not even running failed?!"); | ||||
|                                         return | ||||
|                                     }, | ||||
|                                     None => { | ||||
|                                         (1, Instant::now()) | ||||
|                                     } | ||||
|                                 }); | ||||
|                                 servercurrentpdus.remove(&prefix).unwrap(); | ||||
|                             } | ||||
|                         }; | ||||
|                     }, | ||||
|  | @ -220,24 +215,12 @@ impl Sending { | |||
| 
 | ||||
|                             let servernamepduid = key.clone(); | ||||
| 
 | ||||
|                             let exponential_backoff = |(tries, instant): &(u32, Instant)| { | ||||
|                                 // Fail if a request has failed recently (exponential backoff)
 | ||||
|                                 let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); | ||||
|                                 if min_elapsed_duration > Duration::from_secs(60*60*24) { | ||||
|                                     min_elapsed_duration = Duration::from_secs(60*60*24); | ||||
|                                 } | ||||
|                             let mut retry = false; | ||||
| 
 | ||||
|                                 instant.elapsed() < min_elapsed_duration | ||||
|                             }; | ||||
| 
 | ||||
|                             if let Some((outgoing_kind, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) | ||||
|                             if let Some((outgoing_kind, prefix, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) | ||||
|                                 .ok() | ||||
|                                 .filter(|(outgoing_kind, _)| { | ||||
|                                     if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) { | ||||
|                                         return false; | ||||
|                                     } | ||||
| 
 | ||||
|                                     let mut prefix = match outgoing_kind { | ||||
|                                 .map(|(outgoing_kind, pdu_id)| { | ||||
|                                     let mut prefix = match &outgoing_kind { | ||||
|                                         OutgoingKind::Appservice(serv) => { | ||||
|                                             let mut p = b"+".to_vec(); | ||||
|                                             p.extend_from_slice(serv.as_bytes()); | ||||
|  | @ -258,20 +241,59 @@ impl Sending { | |||
|                                     }; | ||||
|                                     prefix.push(0xff); | ||||
| 
 | ||||
|                                     servercurrentpdus | ||||
|                                         .compare_and_swap(prefix, Option::<&[u8]>::None, Some(&[])) // Try to reserve
 | ||||
|                                         == Ok(Ok(())) | ||||
|                                     (outgoing_kind, prefix, pdu_id) | ||||
|                                 }) | ||||
|                                 .filter(|(_, prefix, _)| { | ||||
|                                     let entry = current_transaction_status.entry(prefix.clone()); | ||||
|                                     let mut allow = true; | ||||
| 
 | ||||
|                                     entry.and_modify(|e| match e { | ||||
|                                         TransactionStatus::Running | TransactionStatus::Retrying(_) => { | ||||
|                                             allow = false; // already running
 | ||||
|                                         }, | ||||
|                                         TransactionStatus::Failed(tries, time) => { | ||||
|                                             // Fail if a request has failed recently (exponential backoff)
 | ||||
|                                             let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); | ||||
|                                             if min_elapsed_duration > Duration::from_secs(60*60*24) { | ||||
|                                                 min_elapsed_duration = Duration::from_secs(60*60*24); | ||||
|                                             } | ||||
| 
 | ||||
|                                             if time.elapsed() < min_elapsed_duration { | ||||
|                                                 allow = false; | ||||
|                                             } else { | ||||
|                                                 retry = true; | ||||
|                                                 *e = TransactionStatus::Retrying(*tries); | ||||
|                                             } | ||||
|                                         } | ||||
|                                     }).or_insert(TransactionStatus::Running); | ||||
| 
 | ||||
|                                     allow | ||||
|                                 }) | ||||
|                             { | ||||
|                                 servercurrentpdus.insert(&key, &[]).unwrap(); | ||||
|                                 servernamepduids.remove(&key).unwrap(); | ||||
| 
 | ||||
|                                 last_failed_try.remove(&outgoing_kind); | ||||
|                                 let mut pdus = Vec::new(); | ||||
| 
 | ||||
|                                 if retry { | ||||
|                                     // We retry the previous transaction
 | ||||
|                                     for pdu in servercurrentpdus | ||||
|                                         .scan_prefix(&prefix) | ||||
|                                         .filter_map(|r| r.ok()) | ||||
|                                         .filter_map(|(key, _)| { | ||||
|                                             Self::parse_servercurrentpdus(&key) | ||||
|                                                 .ok() | ||||
|                                                 .map(|(_, p)| p.to_vec()) | ||||
|                                         }) | ||||
|                                     { | ||||
|                                         pdus.push(pdu); | ||||
|                                     } | ||||
|                                 } else { | ||||
|                                     servercurrentpdus.insert(&key, &[]).unwrap(); | ||||
|                                     servernamepduids.remove(&key).unwrap(); | ||||
|                                     pdus.push(pdu_id.to_vec()); | ||||
|                                 } | ||||
|                                 futures.push( | ||||
|                                     Self::handle_event( | ||||
|                                         outgoing_kind, | ||||
|                                         vec![pdu_id.to_vec()], | ||||
|                                         pdus, | ||||
|                                         &db, | ||||
|                                     ) | ||||
|                                 ); | ||||
|  | @ -342,7 +364,7 @@ impl Sending { | |||
|                                     ( | ||||
|                                         server.clone(), | ||||
|                                         Error::bad_database( | ||||
|                                             "[Appservice] Event in servernamepduids not found in ", | ||||
|                                             "[Appservice] Event in servernamepduids not found in db.", | ||||
|                                         ), | ||||
|                                     ) | ||||
|                                 })? | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue