From a551ae2beedf2049755b3ac939cef8eb555bb98d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 1 Feb 2021 17:15:29 +0100 Subject: [PATCH] matrix-sdk: Add sensible connection and request timeouts This sets the default * connection timeout to 5s * request timeout to 10s * request timeout for syncs to the sync timeout + 10s * request timeout for uploads to be based on 1Mbps upload speed expectations --- matrix_sdk/examples/get_profiles.rs | 2 +- matrix_sdk/src/client.rs | 96 +++++++++++++++++------------ matrix_sdk/src/http_client.rs | 37 ++++++++--- matrix_sdk/src/sas.rs | 2 +- 4 files changed, 86 insertions(+), 51 deletions(-) diff --git a/matrix_sdk/examples/get_profiles.rs b/matrix_sdk/examples/get_profiles.rs index c75d25b4..4d20bb76 100644 --- a/matrix_sdk/examples/get_profiles.rs +++ b/matrix_sdk/examples/get_profiles.rs @@ -19,7 +19,7 @@ async fn get_profile(client: Client, mxid: &UserId) -> MatrixResult let request = profile::get_profile::Request::new(mxid); // Start the request using matrix_sdk::Client::send - let resp = client.send(request).await?; + let resp = client.send(request, None).await?; // Use the response and construct a UserProfile struct. // See https://docs.rs/ruma-client-api/0.9.0/ruma_client_api/r0/profile/get_profile/struct.Response.html diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 6cdb7000..0d4abfe2 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -132,6 +132,10 @@ use crate::{ }; const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30); +/// Give the sync a bit more time than the default request timeout does. +const SYNC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); +/// A conservative upload speed of 1Mbps +const DEFAULT_UPLOAD_SPEED: u64 = 125_000; /// An async/await enabled Matrix client. /// @@ -452,7 +456,7 @@ impl Client { pub async fn display_name(&self) -> Result> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = get_display_name::Request::new(&user_id); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(response.displayname) } @@ -475,7 +479,7 @@ impl Client { pub async fn set_display_name(&self, name: Option<&str>) -> Result<()> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = set_display_name::Request::new(&user_id, name); - self.send(request).await?; + self.send(request, None).await?; Ok(()) } @@ -500,7 +504,7 @@ impl Client { pub async fn avatar_url(&self) -> Result> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = get_avatar_url::Request::new(&user_id); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(response.avatar_url) } @@ -513,7 +517,7 @@ impl Client { pub async fn set_avatar_url(&self, url: Option<&str>) -> Result<()> { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = set_avatar_url::Request::new(&user_id, url); - self.send(request).await?; + self.send(request, None).await?; Ok(()) } @@ -672,7 +676,7 @@ impl Client { } ); - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client.receive_login_response(&response).await?; Ok(response) @@ -734,7 +738,7 @@ impl Client { info!("Registering to {}", self.homeserver); let request = registration.into(); - self.send(request).await + self.send(request, None).await } /// Get or upload a sync filter. @@ -748,7 +752,7 @@ impl Client { } else { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = FilterUploadRequest::new(&user_id, definition); - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client .receive_filter_upload(filter_name, &response) @@ -768,7 +772,7 @@ impl Client { /// * `room_id` - The `RoomId` of the room to be joined. pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result { let request = join_room_by_id::Request::new(room_id); - self.send(request).await + self.send(request, None).await } /// Join a room by `RoomId`. @@ -788,7 +792,7 @@ impl Client { let request = assign!(join_room_by_id_or_alias::Request::new(alias), { server_name: server_names, }); - self.send(request).await + self.send(request, None).await } /// Forget a room by `RoomId`. @@ -800,7 +804,7 @@ impl Client { /// * `room_id` - The `RoomId` of the room to be forget. pub async fn forget_room_by_id(&self, room_id: &RoomId) -> Result { let request = forget_room::Request::new(room_id); - self.send(request).await + self.send(request, None).await } /// Ban a user from a room by `RoomId` and `UserId`. @@ -821,7 +825,7 @@ impl Client { reason: Option<&str>, ) -> Result { let request = assign!(ban_user::Request::new(room_id, user_id), { reason }); - self.send(request).await + self.send(request, None).await } /// Kick a user out of the specified room. @@ -842,7 +846,7 @@ impl Client { reason: Option<&str>, ) -> Result { let request = assign!(kick_user::Request::new(room_id, user_id), { reason }); - self.send(request).await + self.send(request, None).await } /// Leave the specified room. @@ -854,7 +858,7 @@ impl Client { /// * `room_id` - The `RoomId` of the room to leave. pub async fn leave_room(&self, room_id: &RoomId) -> Result { let request = leave_room::Request::new(room_id); - self.send(request).await + self.send(request, None).await } /// Invite the specified user by `UserId` to the given room. @@ -874,7 +878,7 @@ impl Client { let recipient = InvitationRecipient::UserId { user_id }; let request = invite_user::Request::new(room_id, recipient); - self.send(request).await + self.send(request, None).await } /// Invite the specified user by third party id to the given room. @@ -893,7 +897,7 @@ impl Client { ) -> Result { let recipient = InvitationRecipient::ThirdPartyId(invite_id); let request = invite_user::Request::new(room_id, recipient); - self.send(request).await + self.send(request, None).await } /// Search the homeserver's directory of public rooms. @@ -939,7 +943,7 @@ impl Client { since, server, }); - self.send(request).await + self.send(request, None).await } /// Search the homeserver's directory of public rooms with a filter. @@ -977,7 +981,7 @@ impl Client { room_search: impl Into>, ) -> Result { let request = room_search.into(); - self.send(request).await + self.send(request, None).await } /// Create a room using the `RoomBuilder` and send the request. @@ -1009,7 +1013,7 @@ impl Client { room: impl Into>, ) -> Result { let request = room.into(); - self.send(request).await + self.send(request, None).await } /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and returns @@ -1044,8 +1048,8 @@ impl Client { &self, request: impl Into>, ) -> Result { - let req = request.into(); - self.send(req).await + let request = request.into(); + self.send(request, None).await } /// Send a request to notify the room of a user typing. @@ -1088,7 +1092,7 @@ impl Client { let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?; let request = TypingRequest::new(&user_id, room_id, typing.into()); - self.send(request).await + self.send(request, None).await } /// Send a request to notify the room the user has read specific event. @@ -1107,7 +1111,7 @@ impl Client { ) -> Result { let request = create_receipt::Request::new(room_id, create_receipt::ReceiptType::Read, event_id); - self.send(request).await + self.send(request, None).await } /// Send a request to notify the room user has read up to specific event. @@ -1130,7 +1134,7 @@ impl Client { let request = assign!(set_read_marker::Request::new(room_id, fully_read), { read_receipt }); - self.send(request).await + self.send(request, None).await } /// Share a group session for the given room. @@ -1261,7 +1265,7 @@ impl Client { let txn_id = txn_id.unwrap_or_else(Uuid::new_v4).to_string(); let request = send_message_event::Request::new(&room_id, &txn_id, &content); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(response) } @@ -1448,11 +1452,13 @@ impl Client { let mut data = Vec::new(); reader.read_to_end(&mut data)?; + let timeout = Duration::from_secs(data.len() as u64 / DEFAULT_UPLOAD_SPEED); + let request = assign!(create_content::Request::new(data), { content_type: Some(content_type.essence_str()), }); - Ok(self.http_client.upload(request).await?) + Ok(self.http_client.upload(request, Some(timeout)).await?) } /// Send an arbitrary request to the server, without updating client state. @@ -1466,6 +1472,9 @@ impl Client { /// /// * `request` - A filled out and valid request for the endpoint to be hit /// + /// * `timeout` - An optional request timeout setting, this overrides the + /// default request setting if one was set. + /// /// # Example /// /// ```no_run @@ -1486,18 +1495,22 @@ impl Client { /// let request = profile::get_profile::Request::new(&user_id); /// /// // Start the request using Client::send() - /// let response = client.send(request).await.unwrap(); + /// let response = client.send(request, None).await.unwrap(); /// /// // Check the corresponding Response struct to find out what types are /// // returned /// # }) /// ``` - pub async fn send(&self, request: Request) -> Result + pub async fn send( + &self, + request: Request, + timeout: Option, + ) -> Result where Request: OutgoingRequest + Debug, HttpError: From>, { - Ok(self.http_client.send(request).await?) + Ok(self.http_client.send(request, timeout).await?) } #[cfg(feature = "encryption")] @@ -1512,7 +1525,7 @@ impl Client { request.messages.clone(), ); - self.send(request).await + self.send(request, None).await } /// Get information of all our own devices. @@ -1541,7 +1554,7 @@ impl Client { pub async fn devices(&self) -> Result { let request = get_devices::Request::new(); - self.send(request).await + self.send(request, None).await } /// Delete the given devices from the server. @@ -1606,13 +1619,13 @@ impl Client { let mut request = delete_devices::Request::new(devices); request.auth = auth_data; - self.send(request).await + self.send(request, None).await } /// Get the room members for the given room. pub async fn room_members(&self, room_id: &RoomId) -> Result { let request = get_member_events::Request::new(room_id); - let response = self.send(request).await?; + let response = self.send(request, None).await?; Ok(self.base_client.receive_members(room_id, &response).await?) } @@ -1638,7 +1651,12 @@ impl Client { timeout: sync_settings.timeout, }); - let response = self.send(request).await?; + let timeout = sync_settings + .timeout + .unwrap_or_else(|| Duration::from_secs(0)) + + SYNC_REQUEST_TIMEOUT; + + let response = self.send(request, Some(timeout)).await?; Ok(self.base_client.receive_sync_response(response).await?) } @@ -1779,7 +1797,7 @@ impl Client { } OutgoingRequests::SignatureUpload(request) => { // TODO remove this unwrap. - if let Ok(resp) = self.send(request.clone()).await { + if let Ok(resp) = self.send(request.clone(), None).await { self.base_client .mark_request_as_sent(&r.request_id(), &resp) .await @@ -1839,7 +1857,7 @@ impl Client { let _lock = self.key_claim_lock.lock().await; if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? { - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client .mark_request_as_sent(&request_id, &response) .await?; @@ -1898,7 +1916,7 @@ impl Client { request.one_time_keys.as_ref().map_or(0, |k| k.len()) ); - let response = self.send(request.clone()).await?; + let response = self.send(request.clone(), None).await?; self.base_client .mark_request_as_sent(request_id, &response) .await?; @@ -1927,7 +1945,7 @@ impl Client { ) -> Result { let request = assign!(get_keys::Request::new(), { device_keys }); - let response = self.send(request).await?; + let response = self.send(request, None).await?; self.base_client .mark_request_as_sent(request_id, &response) .await?; @@ -2080,8 +2098,8 @@ impl Client { user_signing_key: request.user_signing_key, }); - self.send(request).await?; - self.send(signature_request).await?; + self.send(request, None).await?; + self.send(signature_request, None).await?; Ok(()) } diff --git a/matrix_sdk/src/http_client.rs b/matrix_sdk/src/http_client.rs index 0b692856..d995e1eb 100644 --- a/matrix_sdk/src/http_client.rs +++ b/matrix_sdk/src/http_client.rs @@ -24,12 +24,15 @@ use tracing::trace; use url::Url; use matrix_sdk_common::{ - api::r0::media::create_content, async_trait, locks::RwLock, AsyncTraitDeps, AuthScheme, - FromHttpResponseError, + api::r0::media::create_content, async_trait, instant::Duration, locks::RwLock, AsyncTraitDeps, + AuthScheme, FromHttpResponseError, }; use crate::{error::HttpError, ClientConfig, OutgoingRequest, Session}; +const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + /// Abstraction around the http layer. The allows implementors to use different /// http libraries. #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -78,6 +81,7 @@ pub trait HttpSend: AsyncTraitDeps { async fn send_request( &self, request: http::Request>, + timeout: Option, ) -> Result>, HttpError>; } @@ -94,6 +98,7 @@ impl HttpClient { request: Request, session: Arc>>, content_type: Option, + timeout: Option, ) -> Result>, HttpError> { let mut request = { let read_guard; @@ -122,15 +127,16 @@ impl HttpClient { } } - self.inner.send_request(request).await + self.inner.send_request(request, timeout).await } pub async fn upload( &self, request: create_content::Request<'_>, + timeout: Option, ) -> Result { let response = self - .send_request(request, self.session.clone(), None) + .send_request(request, self.session.clone(), None, timeout) .await?; Ok(create_content::Response::try_from(response)?) } @@ -138,6 +144,7 @@ impl HttpClient { pub async fn send( &self, request: Request, + timeout: Option, ) -> Result where Request: OutgoingRequest + Debug, @@ -145,7 +152,7 @@ impl HttpClient { { let content_type = HeaderValue::from_static("application/json"); let response = self - .send_request(request, self.session.clone(), Some(content_type)) + .send_request(request, self.session.clone(), Some(content_type), timeout) .await?; trace!("Got response: {:?}", response); @@ -164,7 +171,7 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result http_client.timeout(x), - None => http_client, + None => http_client.timeout(DEFAULT_REQUEST_TIMEOUT), }; let http_client = if config.disable_ssl_verification { @@ -188,7 +195,9 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result>, + _: Option, ) -> Result>, HttpError> { let request = reqwest::Request::try_from(request)?; let response = client.execute(request).await?; @@ -236,10 +246,16 @@ async fn send_request( async fn send_request( client: &Client, request: http::Request>, + timeout: Option, ) -> Result>, HttpError> { let backoff = ExponentialBackoff::default(); - // TODO set a sensible timeout for the request here. - let request = &reqwest::Request::try_from(request)?; + let mut request = reqwest::Request::try_from(request)?; + + if let Some(timeout) = timeout { + *request.timeout_mut() = Some(timeout); + } + + let request = &request; let request = || async move { let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?; @@ -274,7 +290,8 @@ impl HttpSend for Client { async fn send_request( &self, request: http::Request>, + timeout: Option, ) -> Result>, HttpError> { - send_request(&self, request).await + send_request(&self, request, timeout).await } } diff --git a/matrix_sdk/src/sas.rs b/matrix_sdk/src/sas.rs index ccbc4224..389633f7 100644 --- a/matrix_sdk/src/sas.rs +++ b/matrix_sdk/src/sas.rs @@ -54,7 +54,7 @@ impl Sas { } if let Some(s) = signature { - self.client.send(s).await?; + self.client.send(s, None).await?; } Ok(())