fix: send transaction retry code
This commit is contained in:
		
							parent
							
								
									dd0b2597b7
								
							
						
					
					
						commit
						3dfc245633
					
				
					 1 changed files with 79 additions and 57 deletions
				
			
		|  | @ -10,7 +10,7 @@ use crate::{ | ||||||
|     appservice_server, database::pusher, server_server, utils, Database, Error, PduEvent, Result, |     appservice_server, database::pusher, server_server, utils, Database, Error, PduEvent, Result, | ||||||
| }; | }; | ||||||
| use federation::transactions::send_transaction_message; | use federation::transactions::send_transaction_message; | ||||||
| use log::warn; | use log::{error, warn}; | ||||||
| use ring::digest; | use ring::digest; | ||||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||||
| use ruma::{ | use ruma::{ | ||||||
|  | @ -32,10 +32,16 @@ pub enum OutgoingKind { | ||||||
| pub struct Sending { | pub struct Sending { | ||||||
|     /// The state for a given state hash.
 |     /// The state for a given state hash.
 | ||||||
|     pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
 |     pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
 | ||||||
|     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation)
 |     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId
 | ||||||
|     pub(super) maximum_requests: Arc<Semaphore>, |     pub(super) maximum_requests: Arc<Semaphore>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | enum TransactionStatus { | ||||||
|  |     Running, | ||||||
|  |     Failed(u32, Instant), // number of times failed, time of last failure
 | ||||||
|  |     Retrying(u32),        // number of times failed
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
| impl Sending { | impl Sending { | ||||||
|     pub fn start_handler(&self, db: &Database) { |     pub fn start_handler(&self, db: &Database) { | ||||||
|         let servernamepduids = self.servernamepduids.clone(); |         let servernamepduids = self.servernamepduids.clone(); | ||||||
|  | @ -47,8 +53,10 @@ impl Sending { | ||||||
|             let mut futures = FuturesUnordered::new(); |             let mut futures = FuturesUnordered::new(); | ||||||
| 
 | 
 | ||||||
|             // Retry requests we could not finish yet
 |             // Retry requests we could not finish yet
 | ||||||
|             let mut current_transactions = HashMap::<OutgoingKind, Vec<Vec<u8>>>::new(); |             let mut subscriber = servernamepduids.watch_prefix(b""); | ||||||
|  |             let mut current_transaction_status = HashMap::<Vec<u8>, TransactionStatus>::new(); | ||||||
| 
 | 
 | ||||||
|  |             let mut initial_transactions = HashMap::<OutgoingKind, Vec<Vec<u8>>>::new(); | ||||||
|             for (key, outgoing_kind, pdu) in servercurrentpdus |             for (key, outgoing_kind, pdu) in servercurrentpdus | ||||||
|                 .iter() |                 .iter() | ||||||
|                 .filter_map(|r| r.ok()) |                 .filter_map(|r| r.ok()) | ||||||
|  | @ -58,18 +66,15 @@ impl Sending { | ||||||
|                         .map(|(k, p)| (key, k, p.to_vec())) |                         .map(|(k, p)| (key, k, p.to_vec())) | ||||||
|                 }) |                 }) | ||||||
|             { |             { | ||||||
|                 if pdu.is_empty() { |                 let entry = initial_transactions | ||||||
|                     // Remove old reservation key
 |                     .entry(outgoing_kind.clone()) | ||||||
|                     servercurrentpdus.remove(key).unwrap(); |  | ||||||
|                     continue; |  | ||||||
|                 } |  | ||||||
| 
 |  | ||||||
|                 let entry = current_transactions |  | ||||||
|                     .entry(outgoing_kind) |  | ||||||
|                     .or_insert_with(Vec::new); |                     .or_insert_with(Vec::new); | ||||||
| 
 | 
 | ||||||
|                 if entry.len() > 30 { |                 if entry.len() > 30 { | ||||||
|                     warn!("Dropping some current pdus because too many were queued. This should not happen."); |                     warn!( | ||||||
|  |                         "Dropping some current pdu: {:?} {:?} {:?}", | ||||||
|  |                         key, outgoing_kind, pdu | ||||||
|  |                     ); | ||||||
|                     servercurrentpdus.remove(key).unwrap(); |                     servercurrentpdus.remove(key).unwrap(); | ||||||
|                     continue; |                     continue; | ||||||
|                 } |                 } | ||||||
|  | @ -77,8 +82,7 @@ impl Sending { | ||||||
|                 entry.push(pdu); |                 entry.push(pdu); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             for (outgoing_kind, pdus) in current_transactions { |             for (outgoing_kind, pdus) in initial_transactions { | ||||||
|                 // Create new reservation
 |  | ||||||
|                 let mut prefix = match &outgoing_kind { |                 let mut prefix = match &outgoing_kind { | ||||||
|                     OutgoingKind::Appservice(server) => { |                     OutgoingKind::Appservice(server) => { | ||||||
|                         let mut p = b"+".to_vec(); |                         let mut p = b"+".to_vec(); | ||||||
|  | @ -99,14 +103,10 @@ impl Sending { | ||||||
|                     } |                     } | ||||||
|                 }; |                 }; | ||||||
|                 prefix.push(0xff); |                 prefix.push(0xff); | ||||||
|                 servercurrentpdus.insert(prefix, &[]).unwrap(); |                 current_transaction_status.insert(prefix, TransactionStatus::Running); | ||||||
| 
 |  | ||||||
|                 futures.push(Self::handle_event(outgoing_kind.clone(), pdus, &db)); |                 futures.push(Self::handle_event(outgoing_kind.clone(), pdus, &db)); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             let mut last_failed_try: HashMap<OutgoingKind, (u32, Instant)> = HashMap::new(); |  | ||||||
| 
 |  | ||||||
|             let mut subscriber = servernamepduids.watch_prefix(b""); |  | ||||||
|             loop { |             loop { | ||||||
|                 select! { |                 select! { | ||||||
|                     Some(response) = futures.next() => { |                     Some(response) = futures.next() => { | ||||||
|  | @ -138,10 +138,7 @@ impl Sending { | ||||||
|                                     .keys() |                                     .keys() | ||||||
|                                     .filter_map(|r| r.ok()) |                                     .filter_map(|r| r.ok()) | ||||||
|                                 { |                                 { | ||||||
|                                     // Don't remove reservation yet
 |                                     servercurrentpdus.remove(key).unwrap(); | ||||||
|                                     if prefix.len() != key.len() { |  | ||||||
|                                         servercurrentpdus.remove(key).unwrap(); |  | ||||||
|                                     } |  | ||||||
|                                 } |                                 } | ||||||
| 
 | 
 | ||||||
|                                 // Find events that have been added since starting the last request
 |                                 // Find events that have been added since starting the last request
 | ||||||
|  | @ -171,8 +168,7 @@ impl Sending { | ||||||
|                                         ) |                                         ) | ||||||
|                                     ); |                                     ); | ||||||
|                                 } else { |                                 } else { | ||||||
|                                     servercurrentpdus.remove(&prefix).unwrap(); |                                     current_transaction_status.remove(&prefix); | ||||||
|                                     // servercurrentpdus with the prefix should be empty now
 |  | ||||||
|                                 } |                                 } | ||||||
|                             } |                             } | ||||||
|                             Err((outgoing_kind, _)) => { |                             Err((outgoing_kind, _)) => { | ||||||
|  | @ -198,15 +194,14 @@ impl Sending { | ||||||
| 
 | 
 | ||||||
|                                 prefix.push(0xff); |                                 prefix.push(0xff); | ||||||
| 
 | 
 | ||||||
|                                 last_failed_try.insert(outgoing_kind.clone(), match last_failed_try.get(&outgoing_kind) { |                                 current_transaction_status.entry(prefix).and_modify(|e| *e = match e { | ||||||
|                                     Some(last_failed) => { |                                     TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), | ||||||
|                                         (last_failed.0+1, Instant::now()) |                                     TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), | ||||||
|  |                                     TransactionStatus::Failed(_, _) => { | ||||||
|  |                                         error!("Request that was not even running failed?!"); | ||||||
|  |                                         return | ||||||
|                                     }, |                                     }, | ||||||
|                                     None => { |  | ||||||
|                                         (1, Instant::now()) |  | ||||||
|                                     } |  | ||||||
|                                 }); |                                 }); | ||||||
|                                 servercurrentpdus.remove(&prefix).unwrap(); |  | ||||||
|                             } |                             } | ||||||
|                         }; |                         }; | ||||||
|                     }, |                     }, | ||||||
|  | @ -220,24 +215,12 @@ impl Sending { | ||||||
| 
 | 
 | ||||||
|                             let servernamepduid = key.clone(); |                             let servernamepduid = key.clone(); | ||||||
| 
 | 
 | ||||||
|                             let exponential_backoff = |(tries, instant): &(u32, Instant)| { |                             let mut retry = false; | ||||||
|                                 // Fail if a request has failed recently (exponential backoff)
 |  | ||||||
|                                 let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); |  | ||||||
|                                 if min_elapsed_duration > Duration::from_secs(60*60*24) { |  | ||||||
|                                     min_elapsed_duration = Duration::from_secs(60*60*24); |  | ||||||
|                                 } |  | ||||||
| 
 | 
 | ||||||
|                                 instant.elapsed() < min_elapsed_duration |                             if let Some((outgoing_kind, prefix, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) | ||||||
|                             }; |  | ||||||
| 
 |  | ||||||
|                             if let Some((outgoing_kind, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) |  | ||||||
|                                 .ok() |                                 .ok() | ||||||
|                                 .filter(|(outgoing_kind, _)| { |                                 .map(|(outgoing_kind, pdu_id)| { | ||||||
|                                     if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) { |                                     let mut prefix = match &outgoing_kind { | ||||||
|                                         return false; |  | ||||||
|                                     } |  | ||||||
| 
 |  | ||||||
|                                     let mut prefix = match outgoing_kind { |  | ||||||
|                                         OutgoingKind::Appservice(serv) => { |                                         OutgoingKind::Appservice(serv) => { | ||||||
|                                             let mut p = b"+".to_vec(); |                                             let mut p = b"+".to_vec(); | ||||||
|                                             p.extend_from_slice(serv.as_bytes()); |                                             p.extend_from_slice(serv.as_bytes()); | ||||||
|  | @ -258,20 +241,59 @@ impl Sending { | ||||||
|                                     }; |                                     }; | ||||||
|                                     prefix.push(0xff); |                                     prefix.push(0xff); | ||||||
| 
 | 
 | ||||||
|                                     servercurrentpdus |                                     (outgoing_kind, prefix, pdu_id) | ||||||
|                                         .compare_and_swap(prefix, Option::<&[u8]>::None, Some(&[])) // Try to reserve
 |                                 }) | ||||||
|                                         == Ok(Ok(())) |                                 .filter(|(_, prefix, _)| { | ||||||
|  |                                     let entry = current_transaction_status.entry(prefix.clone()); | ||||||
|  |                                     let mut allow = true; | ||||||
|  | 
 | ||||||
|  |                                     entry.and_modify(|e| match e { | ||||||
|  |                                         TransactionStatus::Running | TransactionStatus::Retrying(_) => { | ||||||
|  |                                             allow = false; // already running
 | ||||||
|  |                                         }, | ||||||
|  |                                         TransactionStatus::Failed(tries, time) => { | ||||||
|  |                                             // Fail if a request has failed recently (exponential backoff)
 | ||||||
|  |                                             let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); | ||||||
|  |                                             if min_elapsed_duration > Duration::from_secs(60*60*24) { | ||||||
|  |                                                 min_elapsed_duration = Duration::from_secs(60*60*24); | ||||||
|  |                                             } | ||||||
|  | 
 | ||||||
|  |                                             if time.elapsed() < min_elapsed_duration { | ||||||
|  |                                                 allow = false; | ||||||
|  |                                             } else { | ||||||
|  |                                                 retry = true; | ||||||
|  |                                                 *e = TransactionStatus::Retrying(*tries); | ||||||
|  |                                             } | ||||||
|  |                                         } | ||||||
|  |                                     }).or_insert(TransactionStatus::Running); | ||||||
|  | 
 | ||||||
|  |                                     allow | ||||||
|                                 }) |                                 }) | ||||||
|                             { |                             { | ||||||
|                                 servercurrentpdus.insert(&key, &[]).unwrap(); |                                 let mut pdus = Vec::new(); | ||||||
|                                 servernamepduids.remove(&key).unwrap(); |  | ||||||
| 
 |  | ||||||
|                                 last_failed_try.remove(&outgoing_kind); |  | ||||||
| 
 | 
 | ||||||
|  |                                 if retry { | ||||||
|  |                                     // We retry the previous transaction
 | ||||||
|  |                                     for pdu in servercurrentpdus | ||||||
|  |                                         .scan_prefix(&prefix) | ||||||
|  |                                         .filter_map(|r| r.ok()) | ||||||
|  |                                         .filter_map(|(key, _)| { | ||||||
|  |                                             Self::parse_servercurrentpdus(&key) | ||||||
|  |                                                 .ok() | ||||||
|  |                                                 .map(|(_, p)| p.to_vec()) | ||||||
|  |                                         }) | ||||||
|  |                                     { | ||||||
|  |                                         pdus.push(pdu); | ||||||
|  |                                     } | ||||||
|  |                                 } else { | ||||||
|  |                                     servercurrentpdus.insert(&key, &[]).unwrap(); | ||||||
|  |                                     servernamepduids.remove(&key).unwrap(); | ||||||
|  |                                     pdus.push(pdu_id.to_vec()); | ||||||
|  |                                 } | ||||||
|                                 futures.push( |                                 futures.push( | ||||||
|                                     Self::handle_event( |                                     Self::handle_event( | ||||||
|                                         outgoing_kind, |                                         outgoing_kind, | ||||||
|                                         vec![pdu_id.to_vec()], |                                         pdus, | ||||||
|                                         &db, |                                         &db, | ||||||
|                                     ) |                                     ) | ||||||
|                                 ); |                                 ); | ||||||
|  | @ -342,7 +364,7 @@ impl Sending { | ||||||
|                                     ( |                                     ( | ||||||
|                                         server.clone(), |                                         server.clone(), | ||||||
|                                         Error::bad_database( |                                         Error::bad_database( | ||||||
|                                             "[Appservice] Event in servernamepduids not found in ", |                                             "[Appservice] Event in servernamepduids not found in db.", | ||||||
|                                         ), |                                         ), | ||||||
|                                     ) |                                     ) | ||||||
|                                 })? |                                 })? | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue