From 42ec456abf3415724e05df00d4d33e3f5d3de65a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Sun, 31 Jan 2021 21:10:30 +0100 Subject: [PATCH] matrix-sdk: Add initial support for request retrying --- matrix_sdk/Cargo.toml | 4 +++ matrix_sdk/src/error.rs | 9 +++++ matrix_sdk/src/http_client.rs | 62 +++++++++++++++++++++++++++++++---- 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index 648e358c..d9e8d358 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -50,6 +50,10 @@ default_features = false version = "0.11.0" default_features = false +[dependencies.backoff] +git = "https://github.com/ihrwein/backoff" +features = ["tokio"] + [dependencies.tracing-futures] version = "0.2.4" default-features = false diff --git a/matrix_sdk/src/error.rs b/matrix_sdk/src/error.rs index e4611628..1c2f9e57 100644 --- a/matrix_sdk/src/error.rs +++ b/matrix_sdk/src/error.rs @@ -14,6 +14,7 @@ //! Error conditions. +use http::StatusCode; use matrix_sdk_base::{Error as MatrixError, StoreError}; use matrix_sdk_common::{ api::{ @@ -64,6 +65,14 @@ pub enum HttpError { /// represents an error with information about how to authenticate the user. #[error(transparent)] UiaaError(#[from] FromHttpResponseError), + + /// The server returned a status code that should be retried. + #[error("Server returned an error {0}")] + Server(StatusCode), + + /// The given request can't be cloned and thus can't be retried. + #[error("The request cannot be cloned")] + UnableToCloneRequest, } /// Internal representation of errors. diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index 64f83cc2..a13e2989 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -14,6 +14,10 @@ use std::{convert::TryFrom, fmt::Debug, sync::Arc}; +#[cfg(not(test))] +use backoff::{tokio::retry, Error as RetryError, ExponentialBackoff}; +#[cfg(not(test))] +use http::StatusCode; use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse}; use reqwest::{Client, Response}; use tracing::trace; @@ -43,7 +47,7 @@ pub trait HttpSend: AsyncTraitDeps { /// /// ``` /// use std::convert::TryFrom; - /// use matrix_sdk::{HttpSend, Result, async_trait}; + /// use matrix_sdk::{HttpSend, async_trait, HttpError}; /// /// #[derive(Debug)] /// struct Client(reqwest::Client); @@ -52,7 +56,7 @@ pub trait HttpSend: AsyncTraitDeps { /// async fn response_to_http_response( /// &self, /// mut response: reqwest::Response, - /// ) -> Result>> { + /// ) -> Result>, HttpError> { /// // Convert the reqwest response to a http one. /// todo!() /// } @@ -60,7 +64,7 @@ pub trait HttpSend: AsyncTraitDeps { /// /// #[async_trait] /// impl HttpSend for Client { - /// async fn send_request(&self, request: http::Request>) -> Result>> { + /// async fn send_request(&self, request: http::Request>) -> Result>, HttpError> { /// Ok(self /// .response_to_http_response( /// self.0 @@ -212,6 +216,53 @@ async fn response_to_http_response( Ok(http_builder.body(body).unwrap()) } +#[cfg(test)] +async fn send_request( + client: &Client, + request: http::Request>, +) -> Result>, HttpError> { + let request = reqwest::Request::try_from(request)?; + let response = client.execute(request).await?; + + Ok(response_to_http_response(response).await?) +} + +#[cfg(not(test))] +async fn send_request( + client: &Client, + request: http::Request>, +) -> Result>, HttpError> { + let backoff = ExponentialBackoff::default(); + // TODO set a sensible timeout for the request here. + let request = &reqwest::Request::try_from(request)?; + + let request = || async move { + let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?; + + let response = client + .execute(request) + .await + .map_err(|e| RetryError::Transient(HttpError::Reqwest(e)))?; + + let status_code = response.status(); + // TODO TOO_MANY_REQUESTS will have a retry timeout which we should + // use. + if status_code.is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS { + return Err(RetryError::Transient(HttpError::Server(status_code))); + } + + let response = response_to_http_response(response) + .await + .map_err(|e| RetryError::Permanent(HttpError::Reqwest(e)))?; + + Ok(response) + }; + + let response = retry(backoff, request).await?; + + Ok(response) +} + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl HttpSend for Client { @@ -219,9 +270,6 @@ impl HttpSend for Client { &self, request: http::Request>, ) -> Result>, HttpError> { - Ok( - response_to_http_response(self.execute(reqwest::Request::try_from(request)?).await?) - .await?, - ) + send_request(&self, request).await } }