matrix-sdk: Add initial support for request retrying

master
Damir Jelić 2021-01-31 21:10:30 +01:00
parent 585ca9fdf7
commit 42ec456abf
3 changed files with 68 additions and 7 deletions

View File

@ -50,6 +50,10 @@ default_features = false
version = "0.11.0" version = "0.11.0"
default_features = false default_features = false
[dependencies.backoff]
git = "https://github.com/ihrwein/backoff"
features = ["tokio"]
[dependencies.tracing-futures] [dependencies.tracing-futures]
version = "0.2.4" version = "0.2.4"
default-features = false default-features = false

View File

@ -14,6 +14,7 @@
//! Error conditions. //! Error conditions.
use http::StatusCode;
use matrix_sdk_base::{Error as MatrixError, StoreError}; use matrix_sdk_base::{Error as MatrixError, StoreError};
use matrix_sdk_common::{ use matrix_sdk_common::{
api::{ api::{
@ -64,6 +65,14 @@ pub enum HttpError {
/// represents an error with information about how to authenticate the user. /// represents an error with information about how to authenticate the user.
#[error(transparent)] #[error(transparent)]
UiaaError(#[from] FromHttpResponseError<UiaaError>), UiaaError(#[from] FromHttpResponseError<UiaaError>),
/// The server returned a status code that should be retried.
#[error("Server returned an error {0}")]
Server(StatusCode),
/// The given request can't be cloned and thus can't be retried.
#[error("The request cannot be cloned")]
UnableToCloneRequest,
} }
/// Internal representation of errors. /// Internal representation of errors.

View File

@ -14,6 +14,10 @@
use std::{convert::TryFrom, fmt::Debug, sync::Arc}; use std::{convert::TryFrom, fmt::Debug, sync::Arc};
#[cfg(not(test))]
use backoff::{tokio::retry, Error as RetryError, ExponentialBackoff};
#[cfg(not(test))]
use http::StatusCode;
use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse}; use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse};
use reqwest::{Client, Response}; use reqwest::{Client, Response};
use tracing::trace; use tracing::trace;
@ -43,7 +47,7 @@ pub trait HttpSend: AsyncTraitDeps {
/// ///
/// ``` /// ```
/// use std::convert::TryFrom; /// use std::convert::TryFrom;
/// use matrix_sdk::{HttpSend, Result, async_trait}; /// use matrix_sdk::{HttpSend, async_trait, HttpError};
/// ///
/// #[derive(Debug)] /// #[derive(Debug)]
/// struct Client(reqwest::Client); /// struct Client(reqwest::Client);
@ -52,7 +56,7 @@ pub trait HttpSend: AsyncTraitDeps {
/// async fn response_to_http_response( /// async fn response_to_http_response(
/// &self, /// &self,
/// mut response: reqwest::Response, /// mut response: reqwest::Response,
/// ) -> Result<http::Response<Vec<u8>>> { /// ) -> Result<http::Response<Vec<u8>>, HttpError> {
/// // Convert the reqwest response to a http one. /// // Convert the reqwest response to a http one.
/// todo!() /// todo!()
/// } /// }
@ -60,7 +64,7 @@ pub trait HttpSend: AsyncTraitDeps {
/// ///
/// #[async_trait] /// #[async_trait]
/// impl HttpSend for Client { /// impl HttpSend for Client {
/// async fn send_request(&self, request: http::Request<Vec<u8>>) -> Result<http::Response<Vec<u8>>> { /// async fn send_request(&self, request: http::Request<Vec<u8>>) -> Result<http::Response<Vec<u8>>, HttpError> {
/// Ok(self /// Ok(self
/// .response_to_http_response( /// .response_to_http_response(
/// self.0 /// self.0
@ -212,6 +216,53 @@ async fn response_to_http_response(
Ok(http_builder.body(body).unwrap()) Ok(http_builder.body(body).unwrap())
} }
#[cfg(test)]
async fn send_request(
client: &Client,
request: http::Request<Vec<u8>>,
) -> Result<http::Response<Vec<u8>>, HttpError> {
let request = reqwest::Request::try_from(request)?;
let response = client.execute(request).await?;
Ok(response_to_http_response(response).await?)
}
#[cfg(not(test))]
async fn send_request(
client: &Client,
request: http::Request<Vec<u8>>,
) -> Result<http::Response<Vec<u8>>, HttpError> {
let backoff = ExponentialBackoff::default();
// TODO set a sensible timeout for the request here.
let request = &reqwest::Request::try_from(request)?;
let request = || async move {
let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?;
let response = client
.execute(request)
.await
.map_err(|e| RetryError::Transient(HttpError::Reqwest(e)))?;
let status_code = response.status();
// TODO TOO_MANY_REQUESTS will have a retry timeout which we should
// use.
if status_code.is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS {
return Err(RetryError::Transient(HttpError::Server(status_code)));
}
let response = response_to_http_response(response)
.await
.map_err(|e| RetryError::Permanent(HttpError::Reqwest(e)))?;
Ok(response)
};
let response = retry(backoff, request).await?;
Ok(response)
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl HttpSend for Client { impl HttpSend for Client {
@ -219,9 +270,6 @@ impl HttpSend for Client {
&self, &self,
request: http::Request<Vec<u8>>, request: http::Request<Vec<u8>>,
) -> Result<http::Response<Vec<u8>>, HttpError> { ) -> Result<http::Response<Vec<u8>>, HttpError> {
Ok( send_request(&self, request).await
response_to_http_response(self.execute(reqwest::Request::try_from(request)?).await?)
.await?,
)
} }
} }