fix: don't send new requests to servers if we are already waiting
This commit is contained in:
		
							parent
							
								
									e08dfd982b
								
							
						
					
					
						commit
						ab332363ce
					
				
					 3 changed files with 117 additions and 53 deletions
				
			
		
							
								
								
									
										15
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										15
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							|  | @ -1547,6 +1547,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma" | name = "ruma" | ||||||
| version = "0.0.1" | version = "0.0.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "ruma-api", |  "ruma-api", | ||||||
|  "ruma-appservice-api", |  "ruma-appservice-api", | ||||||
|  | @ -1562,6 +1563,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-api" | name = "ruma-api" | ||||||
| version = "0.17.0-alpha.1" | version = "0.17.0-alpha.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "http", |  "http", | ||||||
|  "percent-encoding", |  "percent-encoding", | ||||||
|  | @ -1576,6 +1578,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-api-macros" | name = "ruma-api-macros" | ||||||
| version = "0.17.0-alpha.1" | version = "0.17.0-alpha.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "proc-macro-crate", |  "proc-macro-crate", | ||||||
|  "proc-macro2", |  "proc-macro2", | ||||||
|  | @ -1586,6 +1589,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-appservice-api" | name = "ruma-appservice-api" | ||||||
| version = "0.2.0-alpha.1" | version = "0.2.0-alpha.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "ruma-api", |  "ruma-api", | ||||||
|  "ruma-common", |  "ruma-common", | ||||||
|  | @ -1598,6 +1602,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-client-api" | name = "ruma-client-api" | ||||||
| version = "0.10.0-alpha.1" | version = "0.10.0-alpha.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "assign", |  "assign", | ||||||
|  "http", |  "http", | ||||||
|  | @ -1616,6 +1621,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-common" | name = "ruma-common" | ||||||
| version = "0.2.0" | version = "0.2.0" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "js_int", |  "js_int", | ||||||
|  "ruma-api", |  "ruma-api", | ||||||
|  | @ -1629,6 +1635,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-events" | name = "ruma-events" | ||||||
| version = "0.22.0-alpha.1" | version = "0.22.0-alpha.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "js_int", |  "js_int", | ||||||
|  "ruma-common", |  "ruma-common", | ||||||
|  | @ -1643,6 +1650,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-events-macros" | name = "ruma-events-macros" | ||||||
| version = "0.22.0-alpha.1" | version = "0.22.0-alpha.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "proc-macro-crate", |  "proc-macro-crate", | ||||||
|  "proc-macro2", |  "proc-macro2", | ||||||
|  | @ -1653,6 +1661,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-federation-api" | name = "ruma-federation-api" | ||||||
| version = "0.0.3" | version = "0.0.3" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "js_int", |  "js_int", | ||||||
|  "ruma-api", |  "ruma-api", | ||||||
|  | @ -1667,6 +1676,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-identifiers" | name = "ruma-identifiers" | ||||||
| version = "0.17.4" | version = "0.17.4" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "rand", |  "rand", | ||||||
|  "ruma-identifiers-macros", |  "ruma-identifiers-macros", | ||||||
|  | @ -1678,6 +1688,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-identifiers-macros" | name = "ruma-identifiers-macros" | ||||||
| version = "0.17.4" | version = "0.17.4" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "proc-macro2", |  "proc-macro2", | ||||||
|  "quote", |  "quote", | ||||||
|  | @ -1688,6 +1699,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-identifiers-validation" | name = "ruma-identifiers-validation" | ||||||
| version = "0.1.1" | version = "0.1.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "serde", |  "serde", | ||||||
|  "strum", |  "strum", | ||||||
|  | @ -1696,6 +1708,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-serde" | name = "ruma-serde" | ||||||
| version = "0.2.3" | version = "0.2.3" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "form_urlencoded", |  "form_urlencoded", | ||||||
|  "itoa", |  "itoa", | ||||||
|  | @ -1707,6 +1720,7 @@ dependencies = [ | ||||||
| [[package]] | [[package]] | ||||||
| name = "ruma-signatures" | name = "ruma-signatures" | ||||||
| version = "0.6.0-dev.1" | version = "0.6.0-dev.1" | ||||||
|  | source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "base64", |  "base64", | ||||||
|  "ring", |  "ring", | ||||||
|  | @ -1956,6 +1970,7 @@ checksum = "7345c971d1ef21ffdbd103a75990a15eb03604fc8b8852ca8cb418ee1a099028" | ||||||
| [[package]] | [[package]] | ||||||
| name = "state-res" | name = "state-res" | ||||||
| version = "0.1.0" | version = "0.1.0" | ||||||
|  | source = "git+https://github.com/timokoesters/state-res?branch=spec-comp#d11a3feb5307715ab5d86af8f25d4bccfee6264b" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "itertools", |  "itertools", | ||||||
|  "js_int", |  "js_int", | ||||||
|  |  | ||||||
|  | @ -1,8 +1,11 @@ | ||||||
| use std::{convert::TryFrom, time::SystemTime}; | use std::{collections::HashSet, convert::TryFrom, time::SystemTime}; | ||||||
| 
 | 
 | ||||||
| use crate::{server_server, utils, Error, Result}; | use crate::{server_server, utils, Error, Result}; | ||||||
|  | use federation::transactions::send_transaction_message; | ||||||
|  | use log::warn; | ||||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||||
| use ruma::{api::federation, Raw, ServerName}; | use ruma::{api::federation, Raw, ServerName}; | ||||||
|  | use sled::IVec; | ||||||
| use tokio::select; | use tokio::select; | ||||||
| 
 | 
 | ||||||
| pub struct Sending { | pub struct Sending { | ||||||
|  | @ -18,66 +21,49 @@ impl Sending { | ||||||
| 
 | 
 | ||||||
|         tokio::spawn(async move { |         tokio::spawn(async move { | ||||||
|             let mut futures = FuturesUnordered::new(); |             let mut futures = FuturesUnordered::new(); | ||||||
|  |             let mut waiting_servers = HashSet::new(); | ||||||
|  | 
 | ||||||
|             let mut subscriber = serverpduids.watch_prefix(b""); |             let mut subscriber = serverpduids.watch_prefix(b""); | ||||||
|             loop { |             loop { | ||||||
|                 select! { |                 select! { | ||||||
|                     Some(_) = futures.next() => {}, |                     Some(server) = futures.next() => { | ||||||
|                     Some(event) = &mut subscriber => { |                         warn!("response: {:?}", &server); | ||||||
|                         let serverpduid = if let sled::Event::Insert {key, ..} = event { |                         match server { | ||||||
|                             key |                             Ok((server, _response)) => { | ||||||
|                         } else |                                 waiting_servers.remove(&server) | ||||||
|                         { return Err::<(), Error>(Error::bad_database("")); }; |  | ||||||
|                         let mut parts = serverpduid.splitn(2, |&b| b == 0xff); |  | ||||||
|                         let server = Box::<ServerName>::try_from( |  | ||||||
|                             utils::string_from_bytes(parts.next().expect("splitn will always return 1 or more elements")) |  | ||||||
|                                 .map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid."))? |  | ||||||
|                             ).map_err(|_| Error::bad_database("ServerName in serverpduid is invalid."))?; |  | ||||||
| 
 |  | ||||||
|                         let pdu_id = parts.next().ok_or_else(|| Error::bad_database("Invalid serverpduid in db."))?; |  | ||||||
|                         let mut pdu_json = rooms.get_pdu_json_from_id(&pdu_id.into())?.ok_or_else(|| Error::bad_database("Event in serverpduids not found in db."))?; |  | ||||||
| 
 |  | ||||||
|                         if let Some(unsigned) = pdu_json |  | ||||||
|                             .as_object_mut() |  | ||||||
|                             .expect("json is object") |  | ||||||
|                             .get_mut("unsigned") { |  | ||||||
|                                 unsigned.as_object_mut().expect("unsigned is object").remove("transaction_id"); |  | ||||||
|                         } |  | ||||||
| 
 |  | ||||||
|                         pdu_json |  | ||||||
|                             .as_object_mut() |  | ||||||
|                             .expect("json is object") |  | ||||||
|                             .remove("event_id"); |  | ||||||
| 
 |  | ||||||
|                         let raw_json = |  | ||||||
|                             serde_json::from_value::<Raw<_>>(pdu_json).expect("Raw::from_value always works"); |  | ||||||
| 
 |  | ||||||
|                         let globals = &globals; |  | ||||||
| 
 |  | ||||||
|                         futures.push( |  | ||||||
|                             async move { |  | ||||||
|                                 let pdus = vec![raw_json]; |  | ||||||
|                                 let transaction_id = utils::random_string(16); |  | ||||||
| 
 |  | ||||||
|                                 server_server::send_request( |  | ||||||
|                                     &globals, |  | ||||||
|                                     server, |  | ||||||
|                                     federation::transactions::send_transaction_message::v1::Request { |  | ||||||
|                                         origin: globals.server_name(), |  | ||||||
|                                         pdus: &pdus, |  | ||||||
|                                         edus: &[], |  | ||||||
|                                         origin_server_ts: SystemTime::now(), |  | ||||||
|                                         transaction_id: &transaction_id, |  | ||||||
|                                     }, |  | ||||||
|                                 ).await |  | ||||||
|                             } |                             } | ||||||
|                         ); |                             Err((server, _e)) => { | ||||||
|  |                                 waiting_servers.remove(&server) | ||||||
|  |                             } | ||||||
|  |                         }; | ||||||
|                     }, |                     }, | ||||||
|  |                     Some(event) = &mut subscriber => { | ||||||
|  |                         if let sled::Event::Insert { key, .. } = event { | ||||||
|  |                             let serverpduid = key.clone(); | ||||||
|  |                             let mut parts = serverpduid.splitn(2, |&b| b == 0xff); | ||||||
|  | 
 | ||||||
|  |                             if let Some((server, pdu_id)) = utils::string_from_bytes( | ||||||
|  |                                     parts | ||||||
|  |                                         .next() | ||||||
|  |                                         .expect("splitn will always return 1 or more elements"), | ||||||
|  |                                 ) | ||||||
|  |                                 .map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid.")) | ||||||
|  |                                 .and_then(|server_str|Box::<ServerName>::try_from(server_str) | ||||||
|  |                                     .map_err(|_| Error::bad_database("ServerName in serverpduid is invalid."))) | ||||||
|  |                                 .ok() | ||||||
|  |                                 .filter(|server| waiting_servers.insert(server.clone())) | ||||||
|  |                                 .and_then(|server| parts | ||||||
|  |                                 .next() | ||||||
|  |                                 .ok_or_else(|| Error::bad_database("Invalid serverpduid in db.")).ok().map(|pdu_id| (server, pdu_id))) | ||||||
|  |                             { | ||||||
|  |                                 futures.push(Self::handle_event(server, pdu_id.into(), &globals, &rooms)); | ||||||
|  |                             } | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         }); |         }); | ||||||
|     } |     } | ||||||
|     /* |  | ||||||
|      */ |  | ||||||
| 
 | 
 | ||||||
|     pub fn send_pdu(&self, server: Box<ServerName>, pdu_id: &[u8]) -> Result<()> { |     pub fn send_pdu(&self, server: Box<ServerName>, pdu_id: &[u8]) -> Result<()> { | ||||||
|         let mut key = server.as_bytes().to_vec(); |         let mut key = server.as_bytes().to_vec(); | ||||||
|  | @ -87,4 +73,63 @@ impl Sending { | ||||||
| 
 | 
 | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     async fn handle_event( | ||||||
|  |         server: Box<ServerName>, | ||||||
|  |         pdu_id: IVec, | ||||||
|  |         globals: &super::globals::Globals, | ||||||
|  |         rooms: &super::rooms::Rooms, | ||||||
|  |     ) -> std::result::Result< | ||||||
|  |         (Box<ServerName>, send_transaction_message::v1::Response), | ||||||
|  |         (Box<ServerName>, Error), | ||||||
|  |     > { | ||||||
|  |         let mut pdu_json = rooms | ||||||
|  |             .get_pdu_json_from_id(&pdu_id) | ||||||
|  |             .map_err(|e| (server.clone(), e))? | ||||||
|  |             .ok_or_else(|| { | ||||||
|  |                 ( | ||||||
|  |                     server.clone(), | ||||||
|  |                     Error::bad_database("Event in serverpduids not found in db."), | ||||||
|  |                 ) | ||||||
|  |             })?; | ||||||
|  | 
 | ||||||
|  |         if let Some(unsigned) = pdu_json | ||||||
|  |             .as_object_mut() | ||||||
|  |             .expect("json is object") | ||||||
|  |             .get_mut("unsigned") | ||||||
|  |         { | ||||||
|  |             unsigned | ||||||
|  |                 .as_object_mut() | ||||||
|  |                 .expect("unsigned is object") | ||||||
|  |                 .remove("transaction_id"); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         pdu_json | ||||||
|  |             .as_object_mut() | ||||||
|  |             .expect("json is object") | ||||||
|  |             .remove("event_id"); | ||||||
|  | 
 | ||||||
|  |         let raw_json = | ||||||
|  |             serde_json::from_value::<Raw<_>>(pdu_json).expect("Raw::from_value always works"); | ||||||
|  | 
 | ||||||
|  |         let globals = &globals; | ||||||
|  | 
 | ||||||
|  |         let pdus = vec![raw_json]; | ||||||
|  |         let transaction_id = utils::random_string(16); | ||||||
|  | 
 | ||||||
|  |         server_server::send_request( | ||||||
|  |             &globals, | ||||||
|  |             server.clone(), | ||||||
|  |             send_transaction_message::v1::Request { | ||||||
|  |                 origin: globals.server_name(), | ||||||
|  |                 pdus: &pdus, | ||||||
|  |                 edus: &[], | ||||||
|  |                 origin_server_ts: SystemTime::now(), | ||||||
|  |                 transaction_id: &transaction_id, | ||||||
|  |             }, | ||||||
|  |         ) | ||||||
|  |         .await | ||||||
|  |         .map(|response| (server.clone(), response)) | ||||||
|  |         .map_err(|e| (server, e)) | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -161,6 +161,7 @@ where | ||||||
| 
 | 
 | ||||||
|     *reqwest_request.timeout_mut() = Some(Duration::from_secs(30)); |     *reqwest_request.timeout_mut() = Some(Duration::from_secs(30)); | ||||||
| 
 | 
 | ||||||
|  |     let url = reqwest_request.url().clone(); | ||||||
|     let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; |     let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; | ||||||
| 
 | 
 | ||||||
|     // Because reqwest::Response -> http::Response is complicated:
 |     // Because reqwest::Response -> http::Response is complicated:
 | ||||||
|  | @ -189,7 +190,10 @@ where | ||||||
|                     .expect("reqwest body is valid http body"), |                     .expect("reqwest body is valid http body"), | ||||||
|             ); |             ); | ||||||
|             response.map_err(|e| { |             response.map_err(|e| { | ||||||
|                 warn!("Server returned bad response: {:?}", e); |                 warn!( | ||||||
|  |                     "Server returned bad response {} ({}): {:?}", | ||||||
|  |                     destination, url, e | ||||||
|  |                 ); | ||||||
|                 Error::BadServerResponse("Server returned bad response.") |                 Error::BadServerResponse("Server returned bad response.") | ||||||
|             }) |             }) | ||||||
|         } |         } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue