From 585ca9fdf71a7d24c280b64a2bccb3a7b32052c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Sun, 31 Jan 2021 18:09:03 +0100 Subject: [PATCH 1/9] matrix-sdk: Split out the http errors into a sub-enum --- matrix_sdk/src/client.rs | 19 +++++---- matrix_sdk/src/error.rs | 77 ++++++++++++++++++----------------- matrix_sdk/src/http_client.rs | 33 +++++++++------ matrix_sdk/src/lib.rs | 2 +- 4 files changed, 70 insertions(+), 61 deletions(-) diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 2ea72ff1..6cdb7000 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -118,6 +118,7 @@ use matrix_sdk_common::{ }; use crate::{ + error::HttpError, http_client::{client_with_config, HttpClient, HttpSend}, Error, OutgoingRequest, Result, }; @@ -1451,7 +1452,7 @@ impl Client { content_type: Some(content_type.essence_str()), }); - self.http_client.upload(request).await + Ok(self.http_client.upload(request).await?) } /// Send an arbitrary request to the server, without updating client state. @@ -1494,9 +1495,9 @@ impl Client { pub async fn send(&self, request: Request) -> Result where Request: OutgoingRequest + Debug, - Error: From>, + HttpError: From>, { - self.http_client.send(request).await + Ok(self.http_client.send(request).await?) } #[cfg(feature = "encryption")] @@ -2276,7 +2277,7 @@ impl Client { #[cfg(test)] mod test { - use crate::ClientConfig; + use crate::{ClientConfig, HttpError}; use super::{ get_public_rooms, get_public_rooms_filtered, register::RegistrationKind, Client, @@ -2471,12 +2472,12 @@ mod test { .create(); if let Err(err) = client.login("example", "wordpass", None, None).await { - if let crate::Error::RumaResponse(crate::FromHttpResponseError::Http( - crate::ServerError::Known(crate::api::Error { + if let crate::Error::Http(HttpError::FromHttpResponse( + crate::FromHttpResponseError::Http(crate::ServerError::Known(crate::api::Error { kind, message, status_code, - }), + })), )) = err { if let crate::api::error::ErrorKind::Forbidden = kind { @@ -2517,10 +2518,10 @@ mod test { }); if let Err(err) = client.register(user).await { - if let crate::Error::UiaaError(crate::FromHttpResponseError::Http( + if let crate::Error::Http(HttpError::UiaaError(crate::FromHttpResponseError::Http( // TODO this should be a UiaaError need to investigate crate::ServerError::Unknown(e), - )) = err + ))) = err { assert!(e.to_string().starts_with("EOF while parsing")) } else { diff --git a/matrix_sdk/src/error.rs b/matrix_sdk/src/error.rs index 29b0d9b4..e4611628 100644 --- a/matrix_sdk/src/error.rs +++ b/matrix_sdk/src/error.rs @@ -20,7 +20,7 @@ use matrix_sdk_common::{ r0::uiaa::{UiaaInfo, UiaaResponse as UiaaError}, Error as RumaClientError, }, - FromHttpResponseError as RumaResponseError, IntoHttpError as RumaIntoHttpError, ServerError, + FromHttpResponseError, IntoHttpError, ServerError, }; use reqwest::Error as ReqwestError; use serde_json::Error as JsonError; @@ -33,9 +33,14 @@ use matrix_sdk_base::crypto::store::CryptoStoreError; /// Result type of the rust-sdk. pub type Result = std::result::Result; -/// Internal representation of errors. +/// An HTTP error, representing either a connection error or an error while +/// converting the raw HTTP response into a Matrix response. #[derive(Error, Debug)] -pub enum Error { +pub enum HttpError { + /// An error at the HTTP layer. + #[error(transparent)] + Reqwest(#[from] ReqwestError), + /// Queried endpoint requires authentication but was called on an anonymous client. #[error("the queried endpoint requires authentication but was called before logging in")] AuthenticationRequired, @@ -44,9 +49,33 @@ pub enum Error { #[error("the queried endpoint is not meant for clients")] NotClientRequest, - /// An error at the HTTP layer. + /// An error converting between ruma_client_api types and Hyper types. #[error(transparent)] - Reqwest(#[from] ReqwestError), + FromHttpResponse(#[from] FromHttpResponseError), + + /// An error converting between ruma_client_api types and Hyper types. + #[error(transparent)] + IntoHttp(#[from] IntoHttpError), + + /// An error occurred while authenticating. + /// + /// When registering or authenticating the Matrix server can send a `UiaaResponse` + /// as the error type, this is a User-Interactive Authentication API response. This + /// represents an error with information about how to authenticate the user. + #[error(transparent)] + UiaaError(#[from] FromHttpResponseError), +} + +/// Internal representation of errors. +#[derive(Error, Debug)] +pub enum Error { + /// Error doing an HTTP request. + #[error(transparent)] + Http(#[from] HttpError), + + /// Queried endpoint requires authentication but was called on an anonymous client. + #[error("the queried endpoint requires authentication but was called before logging in")] + AuthenticationRequired, /// An error de/serializing type for the `StateStore` #[error(transparent)] @@ -56,14 +85,6 @@ pub enum Error { #[error(transparent)] IO(#[from] IoError), - /// An error converting between ruma_client_api types and Hyper types. - #[error("can't parse the JSON response as a Matrix response")] - RumaResponse(RumaResponseError), - - /// An error converting between ruma_client_api types and Hyper types. - #[error("can't convert between ruma_client_api and hyper types.")] - IntoHttp(RumaIntoHttpError), - /// An error occurred in the Matrix client library. #[error(transparent)] MatrixError(#[from] MatrixError), @@ -76,14 +97,6 @@ pub enum Error { /// An error occured in the state store. #[error(transparent)] StateStore(#[from] StoreError), - - /// An error occurred while authenticating. - /// - /// When registering or authenticating the Matrix server can send a `UiaaResponse` - /// as the error type, this is a User-Interactive Authentication API response. This - /// represents an error with information about how to authenticate the user. - #[error("User-Interactive Authentication required.")] - UiaaError(RumaResponseError), } impl Error { @@ -99,9 +112,9 @@ impl Error { /// This method is an convenience method to get to the info the server /// returned on the first, failed request. pub fn uiaa_response(&self) -> Option<&UiaaInfo> { - if let Error::UiaaError(RumaResponseError::Http(ServerError::Known( + if let Error::Http(HttpError::UiaaError(FromHttpResponseError::Http(ServerError::Known( UiaaError::AuthResponse(i), - ))) = self + )))) = self { Some(i) } else { @@ -110,20 +123,8 @@ impl Error { } } -impl From> for Error { - fn from(error: RumaResponseError) -> Self { - Self::UiaaError(error) - } -} - -impl From> for Error { - fn from(error: RumaResponseError) -> Self { - Self::RumaResponse(error) - } -} - -impl From for Error { - fn from(error: RumaIntoHttpError) -> Self { - Self::IntoHttp(error) +impl From for Error { + fn from(e: ReqwestError) -> Self { + Error::Http(HttpError::Reqwest(e)) } } diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index fd84a424..64f83cc2 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -24,7 +24,7 @@ use matrix_sdk_common::{ FromHttpResponseError, }; -use crate::{ClientConfig, Error, OutgoingRequest, Result, Session}; +use crate::{error::HttpError, ClientConfig, OutgoingRequest, Session}; /// Abstraction around the http layer. The allows implementors to use different /// http libraries. @@ -74,7 +74,7 @@ pub trait HttpSend: AsyncTraitDeps { async fn send_request( &self, request: http::Request>, - ) -> Result>>; + ) -> Result>, HttpError>; } #[derive(Clone, Debug)] @@ -90,7 +90,7 @@ impl HttpClient { request: Request, session: Arc>>, content_type: Option, - ) -> Result>> { + ) -> Result>, HttpError> { let mut request = { let read_guard; let access_token = match Request::METADATA.authentication { @@ -100,11 +100,11 @@ impl HttpClient { if let Some(session) = read_guard.as_ref() { Some(session.access_token.as_str()) } else { - return Err(Error::AuthenticationRequired); + return Err(HttpError::AuthenticationRequired); } } AuthScheme::None => None, - _ => return Err(Error::NotClientRequest), + _ => return Err(HttpError::NotClientRequest), }; request.try_into_http_request(&self.homeserver.to_string(), access_token)? @@ -124,17 +124,20 @@ impl HttpClient { pub async fn upload( &self, request: create_content::Request<'_>, - ) -> Result { + ) -> Result { let response = self .send_request(request, self.session.clone(), None) .await?; Ok(create_content::Response::try_from(response)?) } - pub async fn send(&self, request: Request) -> Result + pub async fn send( + &self, + request: Request, + ) -> Result where - Request: OutgoingRequest, - Error: From>, + Request: OutgoingRequest + Debug, + HttpError: From>, { let content_type = HeaderValue::from_static("application/json"); let response = self @@ -143,12 +146,14 @@ impl HttpClient { trace!("Got response: {:?}", response); - Ok(Request::IncomingResponse::try_from(response)?) + let response = Request::IncomingResponse::try_from(response)?; + + Ok(response) } } /// Build a client with the specified configuration. -pub(crate) fn client_with_config(config: &ClientConfig) -> Result { +pub(crate) fn client_with_config(config: &ClientConfig) -> Result { let http_client = reqwest::Client::builder(); #[cfg(not(target_arch = "wasm32"))] @@ -188,7 +193,9 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result { Ok(http_client.build()?) } -async fn response_to_http_response(mut response: Response) -> Result>> { +async fn response_to_http_response( + mut response: Response, +) -> Result>, reqwest::Error> { let status = response.status(); let mut http_builder = HttpResponse::builder().status(status); @@ -211,7 +218,7 @@ impl HttpSend for Client { async fn send_request( &self, request: http::Request>, - ) -> Result>> { + ) -> Result>, HttpError> { Ok( response_to_http_response(self.execute(reqwest::Request::try_from(request)?).await?) .await?, diff --git a/matrix_sdk/src/lib.rs b/matrix_sdk/src/lib.rs index 8528e920..23e14f8f 100644 --- a/matrix_sdk/src/lib.rs +++ b/matrix_sdk/src/lib.rs @@ -90,7 +90,7 @@ pub use client::{Client, ClientConfig, LoopCtrl, SyncSettings}; #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub use device::Device; -pub use error::{Error, Result}; +pub use error::{Error, HttpError, Result}; pub use http_client::HttpSend; #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] From 42ec456abf3415724e05df00d4d33e3f5d3de65a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Sun, 31 Jan 2021 21:10:30 +0100 Subject: [PATCH 2/9] matrix-sdk: Add initial support for request retrying --- matrix_sdk/Cargo.toml | 4 +++ matrix_sdk/src/error.rs | 9 +++++ matrix_sdk/src/http_client.rs | 62 +++++++++++++++++++++++++++++++---- 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index 648e358c..d9e8d358 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -50,6 +50,10 @@ default_features = false version = "0.11.0" default_features = false +[dependencies.backoff] +git = "https://github.com/ihrwein/backoff" +features = ["tokio"] + [dependencies.tracing-futures] version = "0.2.4" default-features = false diff --git a/matrix_sdk/src/error.rs b/matrix_sdk/src/error.rs index e4611628..1c2f9e57 100644 --- a/matrix_sdk/src/error.rs +++ b/matrix_sdk/src/error.rs @@ -14,6 +14,7 @@ //! Error conditions. +use http::StatusCode; use matrix_sdk_base::{Error as MatrixError, StoreError}; use matrix_sdk_common::{ api::{ @@ -64,6 +65,14 @@ pub enum HttpError { /// represents an error with information about how to authenticate the user. #[error(transparent)] UiaaError(#[from] FromHttpResponseError), + + /// The server returned a status code that should be retried. + #[error("Server returned an error {0}")] + Server(StatusCode), + + /// The given request can't be cloned and thus can't be retried. + #[error("The request cannot be cloned")] + UnableToCloneRequest, } /// Internal representation of errors. diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index 64f83cc2..a13e2989 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -14,6 +14,10 @@ use std::{convert::TryFrom, fmt::Debug, sync::Arc}; +#[cfg(not(test))] +use backoff::{tokio::retry, Error as RetryError, ExponentialBackoff}; +#[cfg(not(test))] +use http::StatusCode; use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse}; use reqwest::{Client, Response}; use tracing::trace; @@ -43,7 +47,7 @@ pub trait HttpSend: AsyncTraitDeps { /// /// ``` /// use std::convert::TryFrom; - /// use matrix_sdk::{HttpSend, Result, async_trait}; + /// use matrix_sdk::{HttpSend, async_trait, HttpError}; /// /// #[derive(Debug)] /// struct Client(reqwest::Client); @@ -52,7 +56,7 @@ pub trait HttpSend: AsyncTraitDeps { /// async fn response_to_http_response( /// &self, /// mut response: reqwest::Response, - /// ) -> Result>> { + /// ) -> Result>, HttpError> { /// // Convert the reqwest response to a http one. /// todo!() /// } @@ -60,7 +64,7 @@ pub trait HttpSend: AsyncTraitDeps { /// /// #[async_trait] /// impl HttpSend for Client { - /// async fn send_request(&self, request: http::Request>) -> Result>> { + /// async fn send_request(&self, request: http::Request>) -> Result>, HttpError> { /// Ok(self /// .response_to_http_response( /// self.0 @@ -212,6 +216,53 @@ async fn response_to_http_response( Ok(http_builder.body(body).unwrap()) } +#[cfg(test)] +async fn send_request( + client: &Client, + request: http::Request>, +) -> Result>, HttpError> { + let request = reqwest::Request::try_from(request)?; + let response = client.execute(request).await?; + + Ok(response_to_http_response(response).await?) +} + +#[cfg(not(test))] +async fn send_request( + client: &Client, + request: http::Request>, +) -> Result>, HttpError> { + let backoff = ExponentialBackoff::default(); + // TODO set a sensible timeout for the request here. + let request = &reqwest::Request::try_from(request)?; + + let request = || async move { + let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?; + + let response = client + .execute(request) + .await + .map_err(|e| RetryError::Transient(HttpError::Reqwest(e)))?; + + let status_code = response.status(); + // TODO TOO_MANY_REQUESTS will have a retry timeout which we should + // use. + if status_code.is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS { + return Err(RetryError::Transient(HttpError::Server(status_code))); + } + + let response = response_to_http_response(response) + .await + .map_err(|e| RetryError::Permanent(HttpError::Reqwest(e)))?; + + Ok(response) + }; + + let response = retry(backoff, request).await?; + + Ok(response) +} + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl HttpSend for Client { @@ -219,9 +270,6 @@ impl HttpSend for Client { &self, request: http::Request>, ) -> Result>, HttpError> { - Ok( - response_to_http_response(self.execute(reqwest::Request::try_from(request)?).await?) - .await?, - ) + send_request(&self, request).await } } From 6a4ac8f3611f0b535623e13ca9fe782273d05bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Sun, 31 Jan 2021 21:12:00 +0100 Subject: [PATCH 3/9] matrix-sdk: Replace some unwraps with expects. --- matrix_sdk/src/http_client.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index a13e2989..0b692856 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -182,7 +182,8 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result a.clone(), - None => HeaderValue::from_str(&format!("matrix-rust-sdk {}", crate::VERSION)).unwrap(), + None => HeaderValue::from_str(&format!("matrix-rust-sdk {}", crate::VERSION)) + .expect("Can't construct the version header"), }; headers.insert(reqwest::header::USER_AGENT, user_agent); @@ -203,7 +204,9 @@ async fn response_to_http_response( let status = response.status(); let mut http_builder = HttpResponse::builder().status(status); - let headers = http_builder.headers_mut().unwrap(); + let headers = http_builder + .headers_mut() + .expect("Can't get the response builder headers"); for (k, v) in response.headers_mut().drain() { if let Some(key) = k { @@ -213,7 +216,9 @@ async fn response_to_http_response( let body = response.bytes().await?.as_ref().to_owned(); - Ok(http_builder.body(body).unwrap()) + Ok(http_builder + .body(body) + .expect("Can't construct a response using the given body")) } #[cfg(test)] From a551ae2beedf2049755b3ac939cef8eb555bb98d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 1 Feb 2021 17:15:29 +0100 Subject: [PATCH 4/9] matrix-sdk: Add sensible connection and request timeouts This sets the default * connection timeout to 5s * request timeout to 10s * request timeout for syncs to the sync timeout + 10s * request timeout for uploads to be based on 1Mbps upload speed expectations --- matrix_sdk/examples/get_profiles.rs | 2 +- matrix_sdk/src/client.rs | 96 +++++++++++++++++------------ matrix_sdk/src/http_client.rs | 37 ++++++++--- matrix_sdk/src/sas.rs | 2 +- 4 files changed, 86 insertions(+), 51 deletions(-) diff --git a/matrix_sdk/examples/get_profiles.rs b/matrix_sdk/examples/get_profiles.rs index c75d25b4..4d20bb76 100644 --- a/matrix_sdk/examples/get_profiles.rs +++ b/matrix_sdk/examples/get_profiles.rs @@ -19,7 +19,7 @@ async fn get_profile(client: Client, mxid: &UserId) -> MatrixResult let request = profile::get_profile::Request::new(mxid); // Start the request using matrix_sdk::Client::send - let resp = client.send(request).await?; + let resp = client.send(request, None).await?; // Use the response and construct a UserProfile struct. // See https://docs.rs/ruma-client-api/0.9.0/ruma_client_api/r0/profile/get_profile/struct.Response.html diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 6cdb7000..0d4abfe2 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -132,6 +132,10 @@ use crate::{ }; const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); +/// Give the sync a bit more time than the default request timeout does. +const SYNC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); +/// A conservative upload speed of 1Mbps +const DEFAULT_UPLOAD_SPEED: u64 = 125_000; /// An async/await enabled Matrix client. /// @@ -452,7 +456,7 @@ impl Client { pub async fn display_name(&self) -> Result> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = get_display_name::Request::new(&user_id); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(response.displayname) } @@ -475,7 +479,7 @@ impl Client { pub async fn set_display_name(&self, name: Option<&str>) -> Result<()> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = set_display_name::Request::new(&user_id, name); - self.send(request).await?; + self.send(request, None).await?; Ok(()) } @@ -500,7 +504,7 @@ impl Client { pub async fn avatar_url(&self) -> Result> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = get_avatar_url::Request::new(&user_id); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(response.avatar_url) } @@ -513,7 +517,7 @@ impl Client { pub async fn set_avatar_url(&self, url: Option<&str>) -> Result<()> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = set_avatar_url::Request::new(&user_id, url); - self.send(request).await?; + self.send(request, None).await?; Ok(()) } @@ -672,7 +676,7 @@ impl Client { } ); - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client.receive_login_response(&response).await?; Ok(response) @@ -734,7 +738,7 @@ impl Client { info!("Registering to {}", self.homeserver); let request = registration.into(); - self.send(request).await + self.send(request, None).await } /// Get or upload a sync filter. @@ -748,7 +752,7 @@ impl Client { } else { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = FilterUploadRequest::new(&user_id, definition); - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client .receive_filter_upload(filter_name, &response) @@ -768,7 +772,7 @@ impl Client { /// * `room_id` - The `RoomId` of the room to be joined. pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result { let request = join_room_by_id::Request::new(room_id); - self.send(request).await + self.send(request, None).await } /// Join a room by `RoomId`. @@ -788,7 +792,7 @@ impl Client { let request = assign!(join_room_by_id_or_alias::Request::new(alias), { server_name: server_names, }); - self.send(request).await + self.send(request, None).await } /// Forget a room by `RoomId`. @@ -800,7 +804,7 @@ impl Client { /// * `room_id` - The `RoomId` of the room to be forget. pub async fn forget_room_by_id(&self, room_id: &RoomId) -> Result { let request = forget_room::Request::new(room_id); - self.send(request).await + self.send(request, None).await } /// Ban a user from a room by `RoomId` and `UserId`. @@ -821,7 +825,7 @@ impl Client { reason: Option<&str>, ) -> Result { let request = assign!(ban_user::Request::new(room_id, user_id), { reason }); - self.send(request).await + self.send(request, None).await } /// Kick a user out of the specified room. @@ -842,7 +846,7 @@ impl Client { reason: Option<&str>, ) -> Result { let request = assign!(kick_user::Request::new(room_id, user_id), { reason }); - self.send(request).await + self.send(request, None).await } /// Leave the specified room. @@ -854,7 +858,7 @@ impl Client { /// * `room_id` - The `RoomId` of the room to leave. pub async fn leave_room(&self, room_id: &RoomId) -> Result { let request = leave_room::Request::new(room_id); - self.send(request).await + self.send(request, None).await } /// Invite the specified user by `UserId` to the given room. @@ -874,7 +878,7 @@ impl Client { let recipient = InvitationRecipient::UserId { user_id }; let request = invite_user::Request::new(room_id, recipient); - self.send(request).await + self.send(request, None).await } /// Invite the specified user by third party id to the given room. @@ -893,7 +897,7 @@ impl Client { ) -> Result { let recipient = InvitationRecipient::ThirdPartyId(invite_id); let request = invite_user::Request::new(room_id, recipient); - self.send(request).await + self.send(request, None).await } /// Search the homeserver's directory of public rooms. @@ -939,7 +943,7 @@ impl Client { since, server, }); - self.send(request).await + self.send(request, None).await } /// Search the homeserver's directory of public rooms with a filter. @@ -977,7 +981,7 @@ impl Client { room_search: impl Into>, ) -> Result { let request = room_search.into(); - self.send(request).await + self.send(request, None).await } /// Create a room using the `RoomBuilder` and send the request. @@ -1009,7 +1013,7 @@ impl Client { room: impl Into>, ) -> Result { let request = room.into(); - self.send(request).await + self.send(request, None).await } /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and returns @@ -1044,8 +1048,8 @@ impl Client { &self, request: impl Into>, ) -> Result { - let req = request.into(); - self.send(req).await + let request = request.into(); + self.send(request, None).await } /// Send a request to notify the room of a user typing. @@ -1088,7 +1092,7 @@ impl Client { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = TypingRequest::new(&user_id, room_id, typing.into()); - self.send(request).await + self.send(request, None).await } /// Send a request to notify the room the user has read specific event. @@ -1107,7 +1111,7 @@ impl Client { ) -> Result { let request = create_receipt::Request::new(room_id, create_receipt::ReceiptType::Read, event_id); - self.send(request).await + self.send(request, None).await } /// Send a request to notify the room user has read up to specific event. @@ -1130,7 +1134,7 @@ impl Client { let request = assign!(set_read_marker::Request::new(room_id, fully_read), { read_receipt }); - self.send(request).await + self.send(request, None).await } /// Share a group session for the given room. @@ -1261,7 +1265,7 @@ impl Client { let txn_id = txn_id.unwrap_or_else(Uuid::new_v4).to_string(); let request = send_message_event::Request::new(&room_id, &txn_id, &content); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(response) } @@ -1448,11 +1452,13 @@ impl Client { let mut data = Vec::new(); reader.read_to_end(&mut data)?; + let timeout = Duration::from_secs(data.len() as u64 / DEFAULT_UPLOAD_SPEED); + let request = assign!(create_content::Request::new(data), { content_type: Some(content_type.essence_str()), }); - Ok(self.http_client.upload(request).await?) + Ok(self.http_client.upload(request, Some(timeout)).await?) } /// Send an arbitrary request to the server, without updating client state. @@ -1466,6 +1472,9 @@ impl Client { /// /// * `request` - A filled out and valid request for the endpoint to be hit /// + /// * `timeout` - An optional request timeout setting, this overrides the + /// default request setting if one was set. + /// /// # Example /// /// ```no_run @@ -1486,18 +1495,22 @@ impl Client { /// let request = profile::get_profile::Request::new(&user_id); /// /// // Start the request using Client::send() - /// let response = client.send(request).await.unwrap(); + /// let response = client.send(request, None).await.unwrap(); /// /// // Check the corresponding Response struct to find out what types are /// // returned /// # }) /// ``` - pub async fn send(&self, request: Request) -> Result + pub async fn send( + &self, + request: Request, + timeout: Option, + ) -> Result where Request: OutgoingRequest + Debug, HttpError: From>, { - Ok(self.http_client.send(request).await?) + Ok(self.http_client.send(request, timeout).await?) } #[cfg(feature = "encryption")] @@ -1512,7 +1525,7 @@ impl Client { request.messages.clone(), ); - self.send(request).await + self.send(request, None).await } /// Get information of all our own devices. @@ -1541,7 +1554,7 @@ impl Client { pub async fn devices(&self) -> Result { let request = get_devices::Request::new(); - self.send(request).await + self.send(request, None).await } /// Delete the given devices from the server. @@ -1606,13 +1619,13 @@ impl Client { let mut request = delete_devices::Request::new(devices); request.auth = auth_data; - self.send(request).await + self.send(request, None).await } /// Get the room members for the given room. pub async fn room_members(&self, room_id: &RoomId) -> Result { let request = get_member_events::Request::new(room_id); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(self.base_client.receive_members(room_id, &response).await?) } @@ -1638,7 +1651,12 @@ impl Client { timeout: sync_settings.timeout, }); - let response = self.send(request).await?; + let timeout = sync_settings + .timeout + .unwrap_or_else(|| Duration::from_secs(0)) + + SYNC_REQUEST_TIMEOUT; + + let response = self.send(request, Some(timeout)).await?; Ok(self.base_client.receive_sync_response(response).await?) } @@ -1779,7 +1797,7 @@ impl Client { } OutgoingRequests::SignatureUpload(request) => { // TODO remove this unwrap. - if let Ok(resp) = self.send(request.clone()).await { + if let Ok(resp) = self.send(request.clone(), None).await { self.base_client .mark_request_as_sent(&r.request_id(), &resp) .await @@ -1839,7 +1857,7 @@ impl Client { let _lock = self.key_claim_lock.lock().await; if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? { - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client .mark_request_as_sent(&request_id, &response) .await?; @@ -1898,7 +1916,7 @@ impl Client { request.one_time_keys.as_ref().map_or(0, |k| k.len()) ); - let response = self.send(request.clone()).await?; + let response = self.send(request.clone(), None).await?; self.base_client .mark_request_as_sent(request_id, &response) .await?; @@ -1927,7 +1945,7 @@ impl Client { ) -> Result { let request = assign!(get_keys::Request::new(), { device_keys }); - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client .mark_request_as_sent(request_id, &response) .await?; @@ -2080,8 +2098,8 @@ impl Client { user_signing_key: request.user_signing_key, }); - self.send(request).await?; - self.send(signature_request).await?; + self.send(request, None).await?; + self.send(signature_request, None).await?; Ok(()) } diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index 0b692856..d995e1eb 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -24,12 +24,15 @@ use tracing::trace; use url::Url; use matrix_sdk_common::{ - api::r0::media::create_content, async_trait, locks::RwLock, AsyncTraitDeps, AuthScheme, - FromHttpResponseError, + api::r0::media::create_content, async_trait, instant::Duration, locks::RwLock, AsyncTraitDeps, + AuthScheme, FromHttpResponseError, }; use crate::{error::HttpError, ClientConfig, OutgoingRequest, Session}; +const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + /// Abstraction around the http layer. The allows implementors to use different /// http libraries. #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -78,6 +81,7 @@ pub trait HttpSend: AsyncTraitDeps { async fn send_request( &self, request: http::Request>, + timeout: Option, ) -> Result>, HttpError>; } @@ -94,6 +98,7 @@ impl HttpClient { request: Request, session: Arc>>, content_type: Option, + timeout: Option, ) -> Result>, HttpError> { let mut request = { let read_guard; @@ -122,15 +127,16 @@ impl HttpClient { } } - self.inner.send_request(request).await + self.inner.send_request(request, timeout).await } pub async fn upload( &self, request: create_content::Request<'_>, + timeout: Option, ) -> Result { let response = self - .send_request(request, self.session.clone(), None) + .send_request(request, self.session.clone(), None, timeout) .await?; Ok(create_content::Response::try_from(response)?) } @@ -138,6 +144,7 @@ impl HttpClient { pub async fn send( &self, request: Request, + timeout: Option, ) -> Result where Request: OutgoingRequest + Debug, @@ -145,7 +152,7 @@ impl HttpClient { { let content_type = HeaderValue::from_static("application/json"); let response = self - .send_request(request, self.session.clone(), Some(content_type)) + .send_request(request, self.session.clone(), Some(content_type), timeout) .await?; trace!("Got response: {:?}", response); @@ -164,7 +171,7 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result http_client.timeout(x), - None => http_client, + None => http_client.timeout(DEFAULT_REQUEST_TIMEOUT), }; let http_client = if config.disable_ssl_verification { @@ -188,7 +195,9 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result>, + _: Option, ) -> Result>, HttpError> { let request = reqwest::Request::try_from(request)?; let response = client.execute(request).await?; @@ -236,10 +246,16 @@ async fn send_request( async fn send_request( client: &Client, request: http::Request>, + timeout: Option, ) -> Result>, HttpError> { let backoff = ExponentialBackoff::default(); - // TODO set a sensible timeout for the request here. - let request = &reqwest::Request::try_from(request)?; + let mut request = reqwest::Request::try_from(request)?; + + if let Some(timeout) = timeout { + *request.timeout_mut() = Some(timeout); + } + + let request = &request; let request = || async move { let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?; @@ -274,7 +290,8 @@ impl HttpSend for Client { async fn send_request( &self, request: http::Request>, + timeout: Option, ) -> Result>, HttpError> { - send_request(&self, request).await + send_request(&self, request, timeout).await } } diff --git a/matrix_sdk/src/sas.rs b/matrix_sdk/src/sas.rs index ccbc4224..389633f7 100644 --- a/matrix_sdk/src/sas.rs +++ b/matrix_sdk/src/sas.rs @@ -54,7 +54,7 @@ impl Sas { } if let Some(s) = signature { - self.client.send(s).await?; + self.client.send(s, None).await?; } Ok(()) From 2e2d9b33a4d0d866e37d7374a248edd726c0141e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 1 Feb 2021 17:30:43 +0100 Subject: [PATCH 5/9] contrib: Add a mitmproxy script which can be used to test out request retrying --- contrib/mitmproxy/failures.py | 47 +++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 contrib/mitmproxy/failures.py diff --git a/contrib/mitmproxy/failures.py b/contrib/mitmproxy/failures.py new file mode 100644 index 00000000..10dc1898 --- /dev/null +++ b/contrib/mitmproxy/failures.py @@ -0,0 +1,47 @@ +""" +A mitmproxy script that introduces certain request failures in a deterministic +way. + +Used mainly for Matrix style requests. + +To run execute it with mitmproxy: + + >>> mitmproxy -s failures.py` + +""" +import time +import json + +from mitmproxy import http +from mitmproxy.script import concurrent + +REQUEST_COUNT = 0 + + +@concurrent +def request(flow): + global REQUEST_COUNT + + REQUEST_COUNT += 1 + + if REQUEST_COUNT % 2 == 0: + return + elif REQUEST_COUNT % 3 == 0: + flow.response = http.HTTPResponse.make( + 500, + b"Gateway error", + ) + elif REQUEST_COUNT % 7 == 0: + if "sync" in flow.request.pretty_url: + time.sleep(60) + else: + time.sleep(30) + else: + flow.response = http.HTTPResponse.make( + 429, + json.dumps({ + "errcode": "M_LIMIT_EXCEEDED", + "error": "Too many requests", + "retry_after_ms": 2000 + }) + ) From 19e98849639273b66cc292e0c1375f255cbb541d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 1 Feb 2021 17:58:03 +0100 Subject: [PATCH 6/9] matrix-sdk: Update for the latest backoff changes --- matrix_sdk/Cargo.toml | 1 + matrix_sdk/src/http_client.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index d9e8d358..bca1f66b 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -53,6 +53,7 @@ default_features = false [dependencies.backoff] git = "https://github.com/ihrwein/backoff" features = ["tokio"] +rev = "fa3fb91431729ce871d29c62b93425b8aec740f4" [dependencies.tracing-futures] version = "0.2.4" diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index d995e1eb..1a66203e 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -15,7 +15,7 @@ use std::{convert::TryFrom, fmt::Debug, sync::Arc}; #[cfg(not(test))] -use backoff::{tokio::retry, Error as RetryError, ExponentialBackoff}; +use backoff::{future::retry, Error as RetryError, ExponentialBackoff}; #[cfg(not(test))] use http::StatusCode; use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse}; From f3d4f6aab460bf990c6e1a11f4b1efd750143244 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 1 Feb 2021 19:24:29 +0100 Subject: [PATCH 7/9] matrix-sdk: Fix our HttpClient trait implementation example --- matrix_sdk/src/http_client.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index 1a66203e..60a89e97 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -51,6 +51,7 @@ pub trait HttpSend: AsyncTraitDeps { /// ``` /// use std::convert::TryFrom; /// use matrix_sdk::{HttpSend, async_trait, HttpError}; + /// # use std::time::Duration; /// /// #[derive(Debug)] /// struct Client(reqwest::Client); @@ -67,7 +68,11 @@ pub trait HttpSend: AsyncTraitDeps { /// /// #[async_trait] /// impl HttpSend for Client { - /// async fn send_request(&self, request: http::Request>) -> Result>, HttpError> { + /// async fn send_request( + /// &self, + /// request: http::Request>, + /// timeout: Option, + /// ) -> Result>, HttpError> { /// Ok(self /// .response_to_http_response( /// self.0 From ca7117af2b34540b75e204e864ef3a458c9e17cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 1 Feb 2021 21:56:15 +0100 Subject: [PATCH 8/9] matrix-sdk: Clamp the request timeout for uploads to a sensible value --- matrix_sdk/src/client.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 0d4abfe2..016da325 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -136,6 +136,8 @@ const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); const SYNC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); /// A conservative upload speed of 1Mbps const DEFAULT_UPLOAD_SPEED: u64 = 125_000; +/// 5 min minimal upload request timeout, used to clamp the request timeout. +const MIN_UPLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 5); /// An async/await enabled Matrix client. /// @@ -1452,7 +1454,10 @@ impl Client { let mut data = Vec::new(); reader.read_to_end(&mut data)?; - let timeout = Duration::from_secs(data.len() as u64 / DEFAULT_UPLOAD_SPEED); + let timeout = std::cmp::max( + Duration::from_secs(data.len() as u64 / DEFAULT_UPLOAD_SPEED), + MIN_UPLOAD_REQUEST_TIMEOUT, + ); let request = assign!(create_content::Request::new(data), { content_type: Some(content_type.essence_str()), From 36e3039d73b51e64e39e247c98d316883e37beb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Sun, 7 Feb 2021 12:53:06 +0100 Subject: [PATCH 9/9] matrix-sdk: Disable request retrying for wasm for now Backoff supports the retry method for futures only for non-wasm targets for now, thus we're going to disable it until that changes. --- matrix_sdk/Cargo.toml | 2 +- matrix_sdk/src/http_client.rs | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index bca1f66b..95ffc18b 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -50,7 +50,7 @@ default_features = false version = "0.11.0" default_features = false -[dependencies.backoff] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.backoff] git = "https://github.com/ihrwein/backoff" features = ["tokio"] rev = "fa3fb91431729ce871d29c62b93425b8aec740f4" diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index 60a89e97..cd7d6265 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -14,9 +14,9 @@ use std::{convert::TryFrom, fmt::Debug, sync::Arc}; -#[cfg(not(test))] +#[cfg(all(not(test), not(target_arch = "wasm32")))] use backoff::{future::retry, Error as RetryError, ExponentialBackoff}; -#[cfg(not(test))] +#[cfg(all(not(test), not(target_arch = "wasm32")))] use http::StatusCode; use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse}; use reqwest::{Client, Response}; @@ -30,7 +30,9 @@ use matrix_sdk_common::{ use crate::{error::HttpError, ClientConfig, OutgoingRequest, Session}; +#[cfg(not(target_arch = "wasm32"))] const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); +#[cfg(not(target_arch = "wasm32"))] const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// Abstraction around the http layer. The allows implementors to use different @@ -235,7 +237,7 @@ async fn response_to_http_response( .expect("Can't construct a response using the given body")) } -#[cfg(test)] +#[cfg(any(test, target_arch = "wasm32"))] async fn send_request( client: &Client, request: http::Request>, @@ -247,7 +249,7 @@ async fn send_request( Ok(response_to_http_response(response).await?) } -#[cfg(not(test))] +#[cfg(all(not(test), not(target_arch = "wasm32")))] async fn send_request( client: &Client, request: http::Request>,