matrix-sdk: Add sensible connection and request timeouts
This sets the default * connection timeout to 5s * request timeout to 10s * request timeout for syncs to the sync timeout + 10s * request timeout for uploads to be based on 1Mbps upload speed expectations
This commit is contained in:
parent
6a4ac8f361
commit
a551ae2bee
4 changed files with 86 additions and 51 deletions
|
@ -19,7 +19,7 @@ async fn get_profile(client: Client, mxid: &UserId) -> MatrixResult<UserProfile>
|
|||
let request = profile::get_profile::Request::new(mxid);
|
||||
|
||||
// Start the request using matrix_sdk::Client::send
|
||||
let resp = client.send(request).await?;
|
||||
let resp = client.send(request, None).await?;
|
||||
|
||||
// Use the response and construct a UserProfile struct.
|
||||
// See https://docs.rs/ruma-client-api/0.9.0/ruma_client_api/r0/profile/get_profile/struct.Response.html
|
||||
|
|
|
@ -132,6 +132,10 @@ use crate::{
|
|||
};
|
||||
|
||||
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
/// Give the sync a bit more time than the default request timeout does.
|
||||
const SYNC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
/// A conservative upload speed of 1Mbps
|
||||
const DEFAULT_UPLOAD_SPEED: u64 = 125_000;
|
||||
|
||||
/// An async/await enabled Matrix client.
|
||||
///
|
||||
|
@ -452,7 +456,7 @@ impl Client {
|
|||
pub async fn display_name(&self) -> Result<Option<String>> {
|
||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let request = get_display_name::Request::new(&user_id);
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
Ok(response.displayname)
|
||||
}
|
||||
|
||||
|
@ -475,7 +479,7 @@ impl Client {
|
|||
pub async fn set_display_name(&self, name: Option<&str>) -> Result<()> {
|
||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let request = set_display_name::Request::new(&user_id, name);
|
||||
self.send(request).await?;
|
||||
self.send(request, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -500,7 +504,7 @@ impl Client {
|
|||
pub async fn avatar_url(&self) -> Result<Option<String>> {
|
||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let request = get_avatar_url::Request::new(&user_id);
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
Ok(response.avatar_url)
|
||||
}
|
||||
|
||||
|
@ -513,7 +517,7 @@ impl Client {
|
|||
pub async fn set_avatar_url(&self, url: Option<&str>) -> Result<()> {
|
||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let request = set_avatar_url::Request::new(&user_id, url);
|
||||
self.send(request).await?;
|
||||
self.send(request, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -672,7 +676,7 @@ impl Client {
|
|||
}
|
||||
);
|
||||
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
self.base_client.receive_login_response(&response).await?;
|
||||
|
||||
Ok(response)
|
||||
|
@ -734,7 +738,7 @@ impl Client {
|
|||
info!("Registering to {}", self.homeserver);
|
||||
|
||||
let request = registration.into();
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Get or upload a sync filter.
|
||||
|
@ -748,7 +752,7 @@ impl Client {
|
|||
} else {
|
||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let request = FilterUploadRequest::new(&user_id, definition);
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
|
||||
self.base_client
|
||||
.receive_filter_upload(filter_name, &response)
|
||||
|
@ -768,7 +772,7 @@ impl Client {
|
|||
/// * `room_id` - The `RoomId` of the room to be joined.
|
||||
pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<join_room_by_id::Response> {
|
||||
let request = join_room_by_id::Request::new(room_id);
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Join a room by `RoomId`.
|
||||
|
@ -788,7 +792,7 @@ impl Client {
|
|||
let request = assign!(join_room_by_id_or_alias::Request::new(alias), {
|
||||
server_name: server_names,
|
||||
});
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Forget a room by `RoomId`.
|
||||
|
@ -800,7 +804,7 @@ impl Client {
|
|||
/// * `room_id` - The `RoomId` of the room to be forget.
|
||||
pub async fn forget_room_by_id(&self, room_id: &RoomId) -> Result<forget_room::Response> {
|
||||
let request = forget_room::Request::new(room_id);
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Ban a user from a room by `RoomId` and `UserId`.
|
||||
|
@ -821,7 +825,7 @@ impl Client {
|
|||
reason: Option<&str>,
|
||||
) -> Result<ban_user::Response> {
|
||||
let request = assign!(ban_user::Request::new(room_id, user_id), { reason });
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Kick a user out of the specified room.
|
||||
|
@ -842,7 +846,7 @@ impl Client {
|
|||
reason: Option<&str>,
|
||||
) -> Result<kick_user::Response> {
|
||||
let request = assign!(kick_user::Request::new(room_id, user_id), { reason });
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Leave the specified room.
|
||||
|
@ -854,7 +858,7 @@ impl Client {
|
|||
/// * `room_id` - The `RoomId` of the room to leave.
|
||||
pub async fn leave_room(&self, room_id: &RoomId) -> Result<leave_room::Response> {
|
||||
let request = leave_room::Request::new(room_id);
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Invite the specified user by `UserId` to the given room.
|
||||
|
@ -874,7 +878,7 @@ impl Client {
|
|||
let recipient = InvitationRecipient::UserId { user_id };
|
||||
|
||||
let request = invite_user::Request::new(room_id, recipient);
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Invite the specified user by third party id to the given room.
|
||||
|
@ -893,7 +897,7 @@ impl Client {
|
|||
) -> Result<invite_user::Response> {
|
||||
let recipient = InvitationRecipient::ThirdPartyId(invite_id);
|
||||
let request = invite_user::Request::new(room_id, recipient);
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Search the homeserver's directory of public rooms.
|
||||
|
@ -939,7 +943,7 @@ impl Client {
|
|||
since,
|
||||
server,
|
||||
});
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Search the homeserver's directory of public rooms with a filter.
|
||||
|
@ -977,7 +981,7 @@ impl Client {
|
|||
room_search: impl Into<get_public_rooms_filtered::Request<'_>>,
|
||||
) -> Result<get_public_rooms_filtered::Response> {
|
||||
let request = room_search.into();
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Create a room using the `RoomBuilder` and send the request.
|
||||
|
@ -1009,7 +1013,7 @@ impl Client {
|
|||
room: impl Into<create_room::Request<'_>>,
|
||||
) -> Result<create_room::Response> {
|
||||
let request = room.into();
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and returns
|
||||
|
@ -1044,8 +1048,8 @@ impl Client {
|
|||
&self,
|
||||
request: impl Into<get_message_events::Request<'_>>,
|
||||
) -> Result<get_message_events::Response> {
|
||||
let req = request.into();
|
||||
self.send(req).await
|
||||
let request = request.into();
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Send a request to notify the room of a user typing.
|
||||
|
@ -1088,7 +1092,7 @@ impl Client {
|
|||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let request = TypingRequest::new(&user_id, room_id, typing.into());
|
||||
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Send a request to notify the room the user has read specific event.
|
||||
|
@ -1107,7 +1111,7 @@ impl Client {
|
|||
) -> Result<create_receipt::Response> {
|
||||
let request =
|
||||
create_receipt::Request::new(room_id, create_receipt::ReceiptType::Read, event_id);
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Send a request to notify the room user has read up to specific event.
|
||||
|
@ -1130,7 +1134,7 @@ impl Client {
|
|||
let request = assign!(set_read_marker::Request::new(room_id, fully_read), {
|
||||
read_receipt
|
||||
});
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Share a group session for the given room.
|
||||
|
@ -1261,7 +1265,7 @@ impl Client {
|
|||
let txn_id = txn_id.unwrap_or_else(Uuid::new_v4).to_string();
|
||||
let request = send_message_event::Request::new(&room_id, &txn_id, &content);
|
||||
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
|
@ -1448,11 +1452,13 @@ impl Client {
|
|||
let mut data = Vec::new();
|
||||
reader.read_to_end(&mut data)?;
|
||||
|
||||
let timeout = Duration::from_secs(data.len() as u64 / DEFAULT_UPLOAD_SPEED);
|
||||
|
||||
let request = assign!(create_content::Request::new(data), {
|
||||
content_type: Some(content_type.essence_str()),
|
||||
});
|
||||
|
||||
Ok(self.http_client.upload(request).await?)
|
||||
Ok(self.http_client.upload(request, Some(timeout)).await?)
|
||||
}
|
||||
|
||||
/// Send an arbitrary request to the server, without updating client state.
|
||||
|
@ -1466,6 +1472,9 @@ impl Client {
|
|||
///
|
||||
/// * `request` - A filled out and valid request for the endpoint to be hit
|
||||
///
|
||||
/// * `timeout` - An optional request timeout setting, this overrides the
|
||||
/// default request setting if one was set.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```no_run
|
||||
|
@ -1486,18 +1495,22 @@ impl Client {
|
|||
/// let request = profile::get_profile::Request::new(&user_id);
|
||||
///
|
||||
/// // Start the request using Client::send()
|
||||
/// let response = client.send(request).await.unwrap();
|
||||
/// let response = client.send(request, None).await.unwrap();
|
||||
///
|
||||
/// // Check the corresponding Response struct to find out what types are
|
||||
/// // returned
|
||||
/// # })
|
||||
/// ```
|
||||
pub async fn send<Request>(&self, request: Request) -> Result<Request::IncomingResponse>
|
||||
pub async fn send<Request>(
|
||||
&self,
|
||||
request: Request,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Request::IncomingResponse>
|
||||
where
|
||||
Request: OutgoingRequest + Debug,
|
||||
HttpError: From<FromHttpResponseError<Request::EndpointError>>,
|
||||
{
|
||||
Ok(self.http_client.send(request).await?)
|
||||
Ok(self.http_client.send(request, timeout).await?)
|
||||
}
|
||||
|
||||
#[cfg(feature = "encryption")]
|
||||
|
@ -1512,7 +1525,7 @@ impl Client {
|
|||
request.messages.clone(),
|
||||
);
|
||||
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Get information of all our own devices.
|
||||
|
@ -1541,7 +1554,7 @@ impl Client {
|
|||
pub async fn devices(&self) -> Result<get_devices::Response> {
|
||||
let request = get_devices::Request::new();
|
||||
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Delete the given devices from the server.
|
||||
|
@ -1606,13 +1619,13 @@ impl Client {
|
|||
let mut request = delete_devices::Request::new(devices);
|
||||
request.auth = auth_data;
|
||||
|
||||
self.send(request).await
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
/// Get the room members for the given room.
|
||||
pub async fn room_members(&self, room_id: &RoomId) -> Result<MembersResponse> {
|
||||
let request = get_member_events::Request::new(room_id);
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
|
||||
Ok(self.base_client.receive_members(room_id, &response).await?)
|
||||
}
|
||||
|
@ -1638,7 +1651,12 @@ impl Client {
|
|||
timeout: sync_settings.timeout,
|
||||
});
|
||||
|
||||
let response = self.send(request).await?;
|
||||
let timeout = sync_settings
|
||||
.timeout
|
||||
.unwrap_or_else(|| Duration::from_secs(0))
|
||||
+ SYNC_REQUEST_TIMEOUT;
|
||||
|
||||
let response = self.send(request, Some(timeout)).await?;
|
||||
|
||||
Ok(self.base_client.receive_sync_response(response).await?)
|
||||
}
|
||||
|
@ -1779,7 +1797,7 @@ impl Client {
|
|||
}
|
||||
OutgoingRequests::SignatureUpload(request) => {
|
||||
// TODO remove this unwrap.
|
||||
if let Ok(resp) = self.send(request.clone()).await {
|
||||
if let Ok(resp) = self.send(request.clone(), None).await {
|
||||
self.base_client
|
||||
.mark_request_as_sent(&r.request_id(), &resp)
|
||||
.await
|
||||
|
@ -1839,7 +1857,7 @@ impl Client {
|
|||
let _lock = self.key_claim_lock.lock().await;
|
||||
|
||||
if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? {
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
self.base_client
|
||||
.mark_request_as_sent(&request_id, &response)
|
||||
.await?;
|
||||
|
@ -1898,7 +1916,7 @@ impl Client {
|
|||
request.one_time_keys.as_ref().map_or(0, |k| k.len())
|
||||
);
|
||||
|
||||
let response = self.send(request.clone()).await?;
|
||||
let response = self.send(request.clone(), None).await?;
|
||||
self.base_client
|
||||
.mark_request_as_sent(request_id, &response)
|
||||
.await?;
|
||||
|
@ -1927,7 +1945,7 @@ impl Client {
|
|||
) -> Result<get_keys::Response> {
|
||||
let request = assign!(get_keys::Request::new(), { device_keys });
|
||||
|
||||
let response = self.send(request).await?;
|
||||
let response = self.send(request, None).await?;
|
||||
self.base_client
|
||||
.mark_request_as_sent(request_id, &response)
|
||||
.await?;
|
||||
|
@ -2080,8 +2098,8 @@ impl Client {
|
|||
user_signing_key: request.user_signing_key,
|
||||
});
|
||||
|
||||
self.send(request).await?;
|
||||
self.send(signature_request).await?;
|
||||
self.send(request, None).await?;
|
||||
self.send(signature_request, None).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -24,12 +24,15 @@ use tracing::trace;
|
|||
use url::Url;
|
||||
|
||||
use matrix_sdk_common::{
|
||||
api::r0::media::create_content, async_trait, locks::RwLock, AsyncTraitDeps, AuthScheme,
|
||||
FromHttpResponseError,
|
||||
api::r0::media::create_content, async_trait, instant::Duration, locks::RwLock, AsyncTraitDeps,
|
||||
AuthScheme, FromHttpResponseError,
|
||||
};
|
||||
|
||||
use crate::{error::HttpError, ClientConfig, OutgoingRequest, Session};
|
||||
|
||||
const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Abstraction around the http layer. The allows implementors to use different
|
||||
/// http libraries.
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
|
@ -78,6 +81,7 @@ pub trait HttpSend: AsyncTraitDeps {
|
|||
async fn send_request(
|
||||
&self,
|
||||
request: http::Request<Vec<u8>>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<http::Response<Vec<u8>>, HttpError>;
|
||||
}
|
||||
|
||||
|
@ -94,6 +98,7 @@ impl HttpClient {
|
|||
request: Request,
|
||||
session: Arc<RwLock<Option<Session>>>,
|
||||
content_type: Option<HeaderValue>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||
let mut request = {
|
||||
let read_guard;
|
||||
|
@ -122,15 +127,16 @@ impl HttpClient {
|
|||
}
|
||||
}
|
||||
|
||||
self.inner.send_request(request).await
|
||||
self.inner.send_request(request, timeout).await
|
||||
}
|
||||
|
||||
pub async fn upload(
|
||||
&self,
|
||||
request: create_content::Request<'_>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<create_content::Response, HttpError> {
|
||||
let response = self
|
||||
.send_request(request, self.session.clone(), None)
|
||||
.send_request(request, self.session.clone(), None, timeout)
|
||||
.await?;
|
||||
Ok(create_content::Response::try_from(response)?)
|
||||
}
|
||||
|
@ -138,6 +144,7 @@ impl HttpClient {
|
|||
pub async fn send<Request>(
|
||||
&self,
|
||||
request: Request,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Request::IncomingResponse, HttpError>
|
||||
where
|
||||
Request: OutgoingRequest + Debug,
|
||||
|
@ -145,7 +152,7 @@ impl HttpClient {
|
|||
{
|
||||
let content_type = HeaderValue::from_static("application/json");
|
||||
let response = self
|
||||
.send_request(request, self.session.clone(), Some(content_type))
|
||||
.send_request(request, self.session.clone(), Some(content_type), timeout)
|
||||
.await?;
|
||||
|
||||
trace!("Got response: {:?}", response);
|
||||
|
@ -164,7 +171,7 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result<Client, HttpEr
|
|||
let http_client = {
|
||||
let http_client = match config.timeout {
|
||||
Some(x) => http_client.timeout(x),
|
||||
None => http_client,
|
||||
None => http_client.timeout(DEFAULT_REQUEST_TIMEOUT),
|
||||
};
|
||||
|
||||
let http_client = if config.disable_ssl_verification {
|
||||
|
@ -188,7 +195,9 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result<Client, HttpEr
|
|||
|
||||
headers.insert(reqwest::header::USER_AGENT, user_agent);
|
||||
|
||||
http_client.default_headers(headers)
|
||||
http_client
|
||||
.default_headers(headers)
|
||||
.connect_timeout(DEFAULT_CONNECTION_TIMEOUT)
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
|
@ -225,6 +234,7 @@ async fn response_to_http_response(
|
|||
async fn send_request(
|
||||
client: &Client,
|
||||
request: http::Request<Vec<u8>>,
|
||||
_: Option<Duration>,
|
||||
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||
let request = reqwest::Request::try_from(request)?;
|
||||
let response = client.execute(request).await?;
|
||||
|
@ -236,10 +246,16 @@ async fn send_request(
|
|||
async fn send_request(
|
||||
client: &Client,
|
||||
request: http::Request<Vec<u8>>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||
let backoff = ExponentialBackoff::default();
|
||||
// TODO set a sensible timeout for the request here.
|
||||
let request = &reqwest::Request::try_from(request)?;
|
||||
let mut request = reqwest::Request::try_from(request)?;
|
||||
|
||||
if let Some(timeout) = timeout {
|
||||
*request.timeout_mut() = Some(timeout);
|
||||
}
|
||||
|
||||
let request = &request;
|
||||
|
||||
let request = || async move {
|
||||
let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?;
|
||||
|
@ -274,7 +290,8 @@ impl HttpSend for Client {
|
|||
async fn send_request(
|
||||
&self,
|
||||
request: http::Request<Vec<u8>>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||
send_request(&self, request).await
|
||||
send_request(&self, request, timeout).await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ impl Sas {
|
|||
}
|
||||
|
||||
if let Some(s) = signature {
|
||||
self.client.send(s).await?;
|
||||
self.client.send(s, None).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in a new issue