Merge branch 'request-retrying'
commit
1e67f338ac
|
@ -0,0 +1,47 @@
|
||||||
|
"""
|
||||||
|
A mitmproxy script that introduces certain request failures in a deterministic
|
||||||
|
way.
|
||||||
|
|
||||||
|
Used mainly for Matrix style requests.
|
||||||
|
|
||||||
|
To run execute it with mitmproxy:
|
||||||
|
|
||||||
|
>>> mitmproxy -s failures.py`
|
||||||
|
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
|
||||||
|
from mitmproxy import http
|
||||||
|
from mitmproxy.script import concurrent
|
||||||
|
|
||||||
|
REQUEST_COUNT = 0
|
||||||
|
|
||||||
|
|
||||||
|
@concurrent
|
||||||
|
def request(flow):
|
||||||
|
global REQUEST_COUNT
|
||||||
|
|
||||||
|
REQUEST_COUNT += 1
|
||||||
|
|
||||||
|
if REQUEST_COUNT % 2 == 0:
|
||||||
|
return
|
||||||
|
elif REQUEST_COUNT % 3 == 0:
|
||||||
|
flow.response = http.HTTPResponse.make(
|
||||||
|
500,
|
||||||
|
b"Gateway error",
|
||||||
|
)
|
||||||
|
elif REQUEST_COUNT % 7 == 0:
|
||||||
|
if "sync" in flow.request.pretty_url:
|
||||||
|
time.sleep(60)
|
||||||
|
else:
|
||||||
|
time.sleep(30)
|
||||||
|
else:
|
||||||
|
flow.response = http.HTTPResponse.make(
|
||||||
|
429,
|
||||||
|
json.dumps({
|
||||||
|
"errcode": "M_LIMIT_EXCEEDED",
|
||||||
|
"error": "Too many requests",
|
||||||
|
"retry_after_ms": 2000
|
||||||
|
})
|
||||||
|
)
|
|
@ -50,6 +50,11 @@ default_features = false
|
||||||
version = "0.11.0"
|
version = "0.11.0"
|
||||||
default_features = false
|
default_features = false
|
||||||
|
|
||||||
|
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.backoff]
|
||||||
|
git = "https://github.com/ihrwein/backoff"
|
||||||
|
features = ["tokio"]
|
||||||
|
rev = "fa3fb91431729ce871d29c62b93425b8aec740f4"
|
||||||
|
|
||||||
[dependencies.tracing-futures]
|
[dependencies.tracing-futures]
|
||||||
version = "0.2.4"
|
version = "0.2.4"
|
||||||
default-features = false
|
default-features = false
|
||||||
|
|
|
@ -19,7 +19,7 @@ async fn get_profile(client: Client, mxid: &UserId) -> MatrixResult<UserProfile>
|
||||||
let request = profile::get_profile::Request::new(mxid);
|
let request = profile::get_profile::Request::new(mxid);
|
||||||
|
|
||||||
// Start the request using matrix_sdk::Client::send
|
// Start the request using matrix_sdk::Client::send
|
||||||
let resp = client.send(request).await?;
|
let resp = client.send(request, None).await?;
|
||||||
|
|
||||||
// Use the response and construct a UserProfile struct.
|
// Use the response and construct a UserProfile struct.
|
||||||
// See https://docs.rs/ruma-client-api/0.9.0/ruma_client_api/r0/profile/get_profile/struct.Response.html
|
// See https://docs.rs/ruma-client-api/0.9.0/ruma_client_api/r0/profile/get_profile/struct.Response.html
|
||||||
|
|
|
@ -118,6 +118,7 @@ use matrix_sdk_common::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
error::HttpError,
|
||||||
http_client::{client_with_config, HttpClient, HttpSend},
|
http_client::{client_with_config, HttpClient, HttpSend},
|
||||||
Error, OutgoingRequest, Result,
|
Error, OutgoingRequest, Result,
|
||||||
};
|
};
|
||||||
|
@ -131,6 +132,12 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
|
const DEFAULT_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
/// Give the sync a bit more time than the default request timeout does.
|
||||||
|
const SYNC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
|
||||||
|
/// A conservative upload speed of 1Mbps
|
||||||
|
const DEFAULT_UPLOAD_SPEED: u64 = 125_000;
|
||||||
|
/// 5 min minimal upload request timeout, used to clamp the request timeout.
|
||||||
|
const MIN_UPLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 5);
|
||||||
|
|
||||||
/// An async/await enabled Matrix client.
|
/// An async/await enabled Matrix client.
|
||||||
///
|
///
|
||||||
|
@ -451,7 +458,7 @@ impl Client {
|
||||||
pub async fn display_name(&self) -> Result<Option<String>> {
|
pub async fn display_name(&self) -> Result<Option<String>> {
|
||||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||||
let request = get_display_name::Request::new(&user_id);
|
let request = get_display_name::Request::new(&user_id);
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
Ok(response.displayname)
|
Ok(response.displayname)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +481,7 @@ impl Client {
|
||||||
pub async fn set_display_name(&self, name: Option<&str>) -> Result<()> {
|
pub async fn set_display_name(&self, name: Option<&str>) -> Result<()> {
|
||||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||||
let request = set_display_name::Request::new(&user_id, name);
|
let request = set_display_name::Request::new(&user_id, name);
|
||||||
self.send(request).await?;
|
self.send(request, None).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,7 +506,7 @@ impl Client {
|
||||||
pub async fn avatar_url(&self) -> Result<Option<String>> {
|
pub async fn avatar_url(&self) -> Result<Option<String>> {
|
||||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||||
let request = get_avatar_url::Request::new(&user_id);
|
let request = get_avatar_url::Request::new(&user_id);
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
Ok(response.avatar_url)
|
Ok(response.avatar_url)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -512,7 +519,7 @@ impl Client {
|
||||||
pub async fn set_avatar_url(&self, url: Option<&str>) -> Result<()> {
|
pub async fn set_avatar_url(&self, url: Option<&str>) -> Result<()> {
|
||||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||||
let request = set_avatar_url::Request::new(&user_id, url);
|
let request = set_avatar_url::Request::new(&user_id, url);
|
||||||
self.send(request).await?;
|
self.send(request, None).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -671,7 +678,7 @@ impl Client {
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
self.base_client.receive_login_response(&response).await?;
|
self.base_client.receive_login_response(&response).await?;
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
|
@ -733,7 +740,7 @@ impl Client {
|
||||||
info!("Registering to {}", self.homeserver);
|
info!("Registering to {}", self.homeserver);
|
||||||
|
|
||||||
let request = registration.into();
|
let request = registration.into();
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get or upload a sync filter.
|
/// Get or upload a sync filter.
|
||||||
|
@ -747,7 +754,7 @@ impl Client {
|
||||||
} else {
|
} else {
|
||||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||||
let request = FilterUploadRequest::new(&user_id, definition);
|
let request = FilterUploadRequest::new(&user_id, definition);
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
|
|
||||||
self.base_client
|
self.base_client
|
||||||
.receive_filter_upload(filter_name, &response)
|
.receive_filter_upload(filter_name, &response)
|
||||||
|
@ -767,7 +774,7 @@ impl Client {
|
||||||
/// * `room_id` - The `RoomId` of the room to be joined.
|
/// * `room_id` - The `RoomId` of the room to be joined.
|
||||||
pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<join_room_by_id::Response> {
|
pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<join_room_by_id::Response> {
|
||||||
let request = join_room_by_id::Request::new(room_id);
|
let request = join_room_by_id::Request::new(room_id);
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Join a room by `RoomId`.
|
/// Join a room by `RoomId`.
|
||||||
|
@ -787,7 +794,7 @@ impl Client {
|
||||||
let request = assign!(join_room_by_id_or_alias::Request::new(alias), {
|
let request = assign!(join_room_by_id_or_alias::Request::new(alias), {
|
||||||
server_name: server_names,
|
server_name: server_names,
|
||||||
});
|
});
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Forget a room by `RoomId`.
|
/// Forget a room by `RoomId`.
|
||||||
|
@ -799,7 +806,7 @@ impl Client {
|
||||||
/// * `room_id` - The `RoomId` of the room to be forget.
|
/// * `room_id` - The `RoomId` of the room to be forget.
|
||||||
pub async fn forget_room_by_id(&self, room_id: &RoomId) -> Result<forget_room::Response> {
|
pub async fn forget_room_by_id(&self, room_id: &RoomId) -> Result<forget_room::Response> {
|
||||||
let request = forget_room::Request::new(room_id);
|
let request = forget_room::Request::new(room_id);
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ban a user from a room by `RoomId` and `UserId`.
|
/// Ban a user from a room by `RoomId` and `UserId`.
|
||||||
|
@ -820,7 +827,7 @@ impl Client {
|
||||||
reason: Option<&str>,
|
reason: Option<&str>,
|
||||||
) -> Result<ban_user::Response> {
|
) -> Result<ban_user::Response> {
|
||||||
let request = assign!(ban_user::Request::new(room_id, user_id), { reason });
|
let request = assign!(ban_user::Request::new(room_id, user_id), { reason });
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Kick a user out of the specified room.
|
/// Kick a user out of the specified room.
|
||||||
|
@ -841,7 +848,7 @@ impl Client {
|
||||||
reason: Option<&str>,
|
reason: Option<&str>,
|
||||||
) -> Result<kick_user::Response> {
|
) -> Result<kick_user::Response> {
|
||||||
let request = assign!(kick_user::Request::new(room_id, user_id), { reason });
|
let request = assign!(kick_user::Request::new(room_id, user_id), { reason });
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Leave the specified room.
|
/// Leave the specified room.
|
||||||
|
@ -853,7 +860,7 @@ impl Client {
|
||||||
/// * `room_id` - The `RoomId` of the room to leave.
|
/// * `room_id` - The `RoomId` of the room to leave.
|
||||||
pub async fn leave_room(&self, room_id: &RoomId) -> Result<leave_room::Response> {
|
pub async fn leave_room(&self, room_id: &RoomId) -> Result<leave_room::Response> {
|
||||||
let request = leave_room::Request::new(room_id);
|
let request = leave_room::Request::new(room_id);
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Invite the specified user by `UserId` to the given room.
|
/// Invite the specified user by `UserId` to the given room.
|
||||||
|
@ -873,7 +880,7 @@ impl Client {
|
||||||
let recipient = InvitationRecipient::UserId { user_id };
|
let recipient = InvitationRecipient::UserId { user_id };
|
||||||
|
|
||||||
let request = invite_user::Request::new(room_id, recipient);
|
let request = invite_user::Request::new(room_id, recipient);
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Invite the specified user by third party id to the given room.
|
/// Invite the specified user by third party id to the given room.
|
||||||
|
@ -892,7 +899,7 @@ impl Client {
|
||||||
) -> Result<invite_user::Response> {
|
) -> Result<invite_user::Response> {
|
||||||
let recipient = InvitationRecipient::ThirdPartyId(invite_id);
|
let recipient = InvitationRecipient::ThirdPartyId(invite_id);
|
||||||
let request = invite_user::Request::new(room_id, recipient);
|
let request = invite_user::Request::new(room_id, recipient);
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Search the homeserver's directory of public rooms.
|
/// Search the homeserver's directory of public rooms.
|
||||||
|
@ -938,7 +945,7 @@ impl Client {
|
||||||
since,
|
since,
|
||||||
server,
|
server,
|
||||||
});
|
});
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Search the homeserver's directory of public rooms with a filter.
|
/// Search the homeserver's directory of public rooms with a filter.
|
||||||
|
@ -976,7 +983,7 @@ impl Client {
|
||||||
room_search: impl Into<get_public_rooms_filtered::Request<'_>>,
|
room_search: impl Into<get_public_rooms_filtered::Request<'_>>,
|
||||||
) -> Result<get_public_rooms_filtered::Response> {
|
) -> Result<get_public_rooms_filtered::Response> {
|
||||||
let request = room_search.into();
|
let request = room_search.into();
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a room using the `RoomBuilder` and send the request.
|
/// Create a room using the `RoomBuilder` and send the request.
|
||||||
|
@ -1008,7 +1015,7 @@ impl Client {
|
||||||
room: impl Into<create_room::Request<'_>>,
|
room: impl Into<create_room::Request<'_>>,
|
||||||
) -> Result<create_room::Response> {
|
) -> Result<create_room::Response> {
|
||||||
let request = room.into();
|
let request = room.into();
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and returns
|
/// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and returns
|
||||||
|
@ -1043,8 +1050,8 @@ impl Client {
|
||||||
&self,
|
&self,
|
||||||
request: impl Into<get_message_events::Request<'_>>,
|
request: impl Into<get_message_events::Request<'_>>,
|
||||||
) -> Result<get_message_events::Response> {
|
) -> Result<get_message_events::Response> {
|
||||||
let req = request.into();
|
let request = request.into();
|
||||||
self.send(req).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a request to notify the room of a user typing.
|
/// Send a request to notify the room of a user typing.
|
||||||
|
@ -1087,7 +1094,7 @@ impl Client {
|
||||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||||
let request = TypingRequest::new(&user_id, room_id, typing.into());
|
let request = TypingRequest::new(&user_id, room_id, typing.into());
|
||||||
|
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a request to notify the room the user has read specific event.
|
/// Send a request to notify the room the user has read specific event.
|
||||||
|
@ -1106,7 +1113,7 @@ impl Client {
|
||||||
) -> Result<create_receipt::Response> {
|
) -> Result<create_receipt::Response> {
|
||||||
let request =
|
let request =
|
||||||
create_receipt::Request::new(room_id, create_receipt::ReceiptType::Read, event_id);
|
create_receipt::Request::new(room_id, create_receipt::ReceiptType::Read, event_id);
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a request to notify the room user has read up to specific event.
|
/// Send a request to notify the room user has read up to specific event.
|
||||||
|
@ -1129,7 +1136,7 @@ impl Client {
|
||||||
let request = assign!(set_read_marker::Request::new(room_id, fully_read), {
|
let request = assign!(set_read_marker::Request::new(room_id, fully_read), {
|
||||||
read_receipt
|
read_receipt
|
||||||
});
|
});
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Share a group session for the given room.
|
/// Share a group session for the given room.
|
||||||
|
@ -1260,7 +1267,7 @@ impl Client {
|
||||||
let txn_id = txn_id.unwrap_or_else(Uuid::new_v4).to_string();
|
let txn_id = txn_id.unwrap_or_else(Uuid::new_v4).to_string();
|
||||||
let request = send_message_event::Request::new(&room_id, &txn_id, &content);
|
let request = send_message_event::Request::new(&room_id, &txn_id, &content);
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1447,11 +1454,16 @@ impl Client {
|
||||||
let mut data = Vec::new();
|
let mut data = Vec::new();
|
||||||
reader.read_to_end(&mut data)?;
|
reader.read_to_end(&mut data)?;
|
||||||
|
|
||||||
|
let timeout = std::cmp::max(
|
||||||
|
Duration::from_secs(data.len() as u64 / DEFAULT_UPLOAD_SPEED),
|
||||||
|
MIN_UPLOAD_REQUEST_TIMEOUT,
|
||||||
|
);
|
||||||
|
|
||||||
let request = assign!(create_content::Request::new(data), {
|
let request = assign!(create_content::Request::new(data), {
|
||||||
content_type: Some(content_type.essence_str()),
|
content_type: Some(content_type.essence_str()),
|
||||||
});
|
});
|
||||||
|
|
||||||
self.http_client.upload(request).await
|
Ok(self.http_client.upload(request, Some(timeout)).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send an arbitrary request to the server, without updating client state.
|
/// Send an arbitrary request to the server, without updating client state.
|
||||||
|
@ -1465,6 +1477,9 @@ impl Client {
|
||||||
///
|
///
|
||||||
/// * `request` - A filled out and valid request for the endpoint to be hit
|
/// * `request` - A filled out and valid request for the endpoint to be hit
|
||||||
///
|
///
|
||||||
|
/// * `timeout` - An optional request timeout setting, this overrides the
|
||||||
|
/// default request setting if one was set.
|
||||||
|
///
|
||||||
/// # Example
|
/// # Example
|
||||||
///
|
///
|
||||||
/// ```no_run
|
/// ```no_run
|
||||||
|
@ -1485,18 +1500,22 @@ impl Client {
|
||||||
/// let request = profile::get_profile::Request::new(&user_id);
|
/// let request = profile::get_profile::Request::new(&user_id);
|
||||||
///
|
///
|
||||||
/// // Start the request using Client::send()
|
/// // Start the request using Client::send()
|
||||||
/// let response = client.send(request).await.unwrap();
|
/// let response = client.send(request, None).await.unwrap();
|
||||||
///
|
///
|
||||||
/// // Check the corresponding Response struct to find out what types are
|
/// // Check the corresponding Response struct to find out what types are
|
||||||
/// // returned
|
/// // returned
|
||||||
/// # })
|
/// # })
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn send<Request>(&self, request: Request) -> Result<Request::IncomingResponse>
|
pub async fn send<Request>(
|
||||||
|
&self,
|
||||||
|
request: Request,
|
||||||
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<Request::IncomingResponse>
|
||||||
where
|
where
|
||||||
Request: OutgoingRequest + Debug,
|
Request: OutgoingRequest + Debug,
|
||||||
Error: From<FromHttpResponseError<Request::EndpointError>>,
|
HttpError: From<FromHttpResponseError<Request::EndpointError>>,
|
||||||
{
|
{
|
||||||
self.http_client.send(request).await
|
Ok(self.http_client.send(request, timeout).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
|
@ -1511,7 +1530,7 @@ impl Client {
|
||||||
request.messages.clone(),
|
request.messages.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get information of all our own devices.
|
/// Get information of all our own devices.
|
||||||
|
@ -1540,7 +1559,7 @@ impl Client {
|
||||||
pub async fn devices(&self) -> Result<get_devices::Response> {
|
pub async fn devices(&self) -> Result<get_devices::Response> {
|
||||||
let request = get_devices::Request::new();
|
let request = get_devices::Request::new();
|
||||||
|
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete the given devices from the server.
|
/// Delete the given devices from the server.
|
||||||
|
@ -1605,13 +1624,13 @@ impl Client {
|
||||||
let mut request = delete_devices::Request::new(devices);
|
let mut request = delete_devices::Request::new(devices);
|
||||||
request.auth = auth_data;
|
request.auth = auth_data;
|
||||||
|
|
||||||
self.send(request).await
|
self.send(request, None).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the room members for the given room.
|
/// Get the room members for the given room.
|
||||||
pub async fn room_members(&self, room_id: &RoomId) -> Result<MembersResponse> {
|
pub async fn room_members(&self, room_id: &RoomId) -> Result<MembersResponse> {
|
||||||
let request = get_member_events::Request::new(room_id);
|
let request = get_member_events::Request::new(room_id);
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
|
|
||||||
Ok(self.base_client.receive_members(room_id, &response).await?)
|
Ok(self.base_client.receive_members(room_id, &response).await?)
|
||||||
}
|
}
|
||||||
|
@ -1637,7 +1656,12 @@ impl Client {
|
||||||
timeout: sync_settings.timeout,
|
timeout: sync_settings.timeout,
|
||||||
});
|
});
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let timeout = sync_settings
|
||||||
|
.timeout
|
||||||
|
.unwrap_or_else(|| Duration::from_secs(0))
|
||||||
|
+ SYNC_REQUEST_TIMEOUT;
|
||||||
|
|
||||||
|
let response = self.send(request, Some(timeout)).await?;
|
||||||
|
|
||||||
Ok(self.base_client.receive_sync_response(response).await?)
|
Ok(self.base_client.receive_sync_response(response).await?)
|
||||||
}
|
}
|
||||||
|
@ -1778,7 +1802,7 @@ impl Client {
|
||||||
}
|
}
|
||||||
OutgoingRequests::SignatureUpload(request) => {
|
OutgoingRequests::SignatureUpload(request) => {
|
||||||
// TODO remove this unwrap.
|
// TODO remove this unwrap.
|
||||||
if let Ok(resp) = self.send(request.clone()).await {
|
if let Ok(resp) = self.send(request.clone(), None).await {
|
||||||
self.base_client
|
self.base_client
|
||||||
.mark_request_as_sent(&r.request_id(), &resp)
|
.mark_request_as_sent(&r.request_id(), &resp)
|
||||||
.await
|
.await
|
||||||
|
@ -1838,7 +1862,7 @@ impl Client {
|
||||||
let _lock = self.key_claim_lock.lock().await;
|
let _lock = self.key_claim_lock.lock().await;
|
||||||
|
|
||||||
if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? {
|
if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? {
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
self.base_client
|
self.base_client
|
||||||
.mark_request_as_sent(&request_id, &response)
|
.mark_request_as_sent(&request_id, &response)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -1897,7 +1921,7 @@ impl Client {
|
||||||
request.one_time_keys.as_ref().map_or(0, |k| k.len())
|
request.one_time_keys.as_ref().map_or(0, |k| k.len())
|
||||||
);
|
);
|
||||||
|
|
||||||
let response = self.send(request.clone()).await?;
|
let response = self.send(request.clone(), None).await?;
|
||||||
self.base_client
|
self.base_client
|
||||||
.mark_request_as_sent(request_id, &response)
|
.mark_request_as_sent(request_id, &response)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -1926,7 +1950,7 @@ impl Client {
|
||||||
) -> Result<get_keys::Response> {
|
) -> Result<get_keys::Response> {
|
||||||
let request = assign!(get_keys::Request::new(), { device_keys });
|
let request = assign!(get_keys::Request::new(), { device_keys });
|
||||||
|
|
||||||
let response = self.send(request).await?;
|
let response = self.send(request, None).await?;
|
||||||
self.base_client
|
self.base_client
|
||||||
.mark_request_as_sent(request_id, &response)
|
.mark_request_as_sent(request_id, &response)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -2079,8 +2103,8 @@ impl Client {
|
||||||
user_signing_key: request.user_signing_key,
|
user_signing_key: request.user_signing_key,
|
||||||
});
|
});
|
||||||
|
|
||||||
self.send(request).await?;
|
self.send(request, None).await?;
|
||||||
self.send(signature_request).await?;
|
self.send(signature_request, None).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -2276,7 +2300,7 @@ impl Client {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use crate::ClientConfig;
|
use crate::{ClientConfig, HttpError};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
get_public_rooms, get_public_rooms_filtered, register::RegistrationKind, Client,
|
get_public_rooms, get_public_rooms_filtered, register::RegistrationKind, Client,
|
||||||
|
@ -2471,12 +2495,12 @@ mod test {
|
||||||
.create();
|
.create();
|
||||||
|
|
||||||
if let Err(err) = client.login("example", "wordpass", None, None).await {
|
if let Err(err) = client.login("example", "wordpass", None, None).await {
|
||||||
if let crate::Error::RumaResponse(crate::FromHttpResponseError::Http(
|
if let crate::Error::Http(HttpError::FromHttpResponse(
|
||||||
crate::ServerError::Known(crate::api::Error {
|
crate::FromHttpResponseError::Http(crate::ServerError::Known(crate::api::Error {
|
||||||
kind,
|
kind,
|
||||||
message,
|
message,
|
||||||
status_code,
|
status_code,
|
||||||
}),
|
})),
|
||||||
)) = err
|
)) = err
|
||||||
{
|
{
|
||||||
if let crate::api::error::ErrorKind::Forbidden = kind {
|
if let crate::api::error::ErrorKind::Forbidden = kind {
|
||||||
|
@ -2517,10 +2541,10 @@ mod test {
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Err(err) = client.register(user).await {
|
if let Err(err) = client.register(user).await {
|
||||||
if let crate::Error::UiaaError(crate::FromHttpResponseError::Http(
|
if let crate::Error::Http(HttpError::UiaaError(crate::FromHttpResponseError::Http(
|
||||||
// TODO this should be a UiaaError need to investigate
|
// TODO this should be a UiaaError need to investigate
|
||||||
crate::ServerError::Unknown(e),
|
crate::ServerError::Unknown(e),
|
||||||
)) = err
|
))) = err
|
||||||
{
|
{
|
||||||
assert!(e.to_string().starts_with("EOF while parsing"))
|
assert!(e.to_string().starts_with("EOF while parsing"))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -14,13 +14,14 @@
|
||||||
|
|
||||||
//! Error conditions.
|
//! Error conditions.
|
||||||
|
|
||||||
|
use http::StatusCode;
|
||||||
use matrix_sdk_base::{Error as MatrixError, StoreError};
|
use matrix_sdk_base::{Error as MatrixError, StoreError};
|
||||||
use matrix_sdk_common::{
|
use matrix_sdk_common::{
|
||||||
api::{
|
api::{
|
||||||
r0::uiaa::{UiaaInfo, UiaaResponse as UiaaError},
|
r0::uiaa::{UiaaInfo, UiaaResponse as UiaaError},
|
||||||
Error as RumaClientError,
|
Error as RumaClientError,
|
||||||
},
|
},
|
||||||
FromHttpResponseError as RumaResponseError, IntoHttpError as RumaIntoHttpError, ServerError,
|
FromHttpResponseError, IntoHttpError, ServerError,
|
||||||
};
|
};
|
||||||
use reqwest::Error as ReqwestError;
|
use reqwest::Error as ReqwestError;
|
||||||
use serde_json::Error as JsonError;
|
use serde_json::Error as JsonError;
|
||||||
|
@ -33,9 +34,14 @@ use matrix_sdk_base::crypto::store::CryptoStoreError;
|
||||||
/// Result type of the rust-sdk.
|
/// Result type of the rust-sdk.
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
/// Internal representation of errors.
|
/// An HTTP error, representing either a connection error or an error while
|
||||||
|
/// converting the raw HTTP response into a Matrix response.
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum Error {
|
pub enum HttpError {
|
||||||
|
/// An error at the HTTP layer.
|
||||||
|
#[error(transparent)]
|
||||||
|
Reqwest(#[from] ReqwestError),
|
||||||
|
|
||||||
/// Queried endpoint requires authentication but was called on an anonymous client.
|
/// Queried endpoint requires authentication but was called on an anonymous client.
|
||||||
#[error("the queried endpoint requires authentication but was called before logging in")]
|
#[error("the queried endpoint requires authentication but was called before logging in")]
|
||||||
AuthenticationRequired,
|
AuthenticationRequired,
|
||||||
|
@ -44,9 +50,41 @@ pub enum Error {
|
||||||
#[error("the queried endpoint is not meant for clients")]
|
#[error("the queried endpoint is not meant for clients")]
|
||||||
NotClientRequest,
|
NotClientRequest,
|
||||||
|
|
||||||
/// An error at the HTTP layer.
|
/// An error converting between ruma_client_api types and Hyper types.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Reqwest(#[from] ReqwestError),
|
FromHttpResponse(#[from] FromHttpResponseError<RumaClientError>),
|
||||||
|
|
||||||
|
/// An error converting between ruma_client_api types and Hyper types.
|
||||||
|
#[error(transparent)]
|
||||||
|
IntoHttp(#[from] IntoHttpError),
|
||||||
|
|
||||||
|
/// An error occurred while authenticating.
|
||||||
|
///
|
||||||
|
/// When registering or authenticating the Matrix server can send a `UiaaResponse`
|
||||||
|
/// as the error type, this is a User-Interactive Authentication API response. This
|
||||||
|
/// represents an error with information about how to authenticate the user.
|
||||||
|
#[error(transparent)]
|
||||||
|
UiaaError(#[from] FromHttpResponseError<UiaaError>),
|
||||||
|
|
||||||
|
/// The server returned a status code that should be retried.
|
||||||
|
#[error("Server returned an error {0}")]
|
||||||
|
Server(StatusCode),
|
||||||
|
|
||||||
|
/// The given request can't be cloned and thus can't be retried.
|
||||||
|
#[error("The request cannot be cloned")]
|
||||||
|
UnableToCloneRequest,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Internal representation of errors.
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum Error {
|
||||||
|
/// Error doing an HTTP request.
|
||||||
|
#[error(transparent)]
|
||||||
|
Http(#[from] HttpError),
|
||||||
|
|
||||||
|
/// Queried endpoint requires authentication but was called on an anonymous client.
|
||||||
|
#[error("the queried endpoint requires authentication but was called before logging in")]
|
||||||
|
AuthenticationRequired,
|
||||||
|
|
||||||
/// An error de/serializing type for the `StateStore`
|
/// An error de/serializing type for the `StateStore`
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
|
@ -56,14 +94,6 @@ pub enum Error {
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
IO(#[from] IoError),
|
IO(#[from] IoError),
|
||||||
|
|
||||||
/// An error converting between ruma_client_api types and Hyper types.
|
|
||||||
#[error("can't parse the JSON response as a Matrix response")]
|
|
||||||
RumaResponse(RumaResponseError<RumaClientError>),
|
|
||||||
|
|
||||||
/// An error converting between ruma_client_api types and Hyper types.
|
|
||||||
#[error("can't convert between ruma_client_api and hyper types.")]
|
|
||||||
IntoHttp(RumaIntoHttpError),
|
|
||||||
|
|
||||||
/// An error occurred in the Matrix client library.
|
/// An error occurred in the Matrix client library.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
MatrixError(#[from] MatrixError),
|
MatrixError(#[from] MatrixError),
|
||||||
|
@ -76,14 +106,6 @@ pub enum Error {
|
||||||
/// An error occured in the state store.
|
/// An error occured in the state store.
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
StateStore(#[from] StoreError),
|
StateStore(#[from] StoreError),
|
||||||
|
|
||||||
/// An error occurred while authenticating.
|
|
||||||
///
|
|
||||||
/// When registering or authenticating the Matrix server can send a `UiaaResponse`
|
|
||||||
/// as the error type, this is a User-Interactive Authentication API response. This
|
|
||||||
/// represents an error with information about how to authenticate the user.
|
|
||||||
#[error("User-Interactive Authentication required.")]
|
|
||||||
UiaaError(RumaResponseError<UiaaError>),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
|
@ -99,9 +121,9 @@ impl Error {
|
||||||
/// This method is an convenience method to get to the info the server
|
/// This method is an convenience method to get to the info the server
|
||||||
/// returned on the first, failed request.
|
/// returned on the first, failed request.
|
||||||
pub fn uiaa_response(&self) -> Option<&UiaaInfo> {
|
pub fn uiaa_response(&self) -> Option<&UiaaInfo> {
|
||||||
if let Error::UiaaError(RumaResponseError::Http(ServerError::Known(
|
if let Error::Http(HttpError::UiaaError(FromHttpResponseError::Http(ServerError::Known(
|
||||||
UiaaError::AuthResponse(i),
|
UiaaError::AuthResponse(i),
|
||||||
))) = self
|
)))) = self
|
||||||
{
|
{
|
||||||
Some(i)
|
Some(i)
|
||||||
} else {
|
} else {
|
||||||
|
@ -110,20 +132,8 @@ impl Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<RumaResponseError<UiaaError>> for Error {
|
impl From<ReqwestError> for Error {
|
||||||
fn from(error: RumaResponseError<UiaaError>) -> Self {
|
fn from(e: ReqwestError) -> Self {
|
||||||
Self::UiaaError(error)
|
Error::Http(HttpError::Reqwest(e))
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<RumaResponseError<RumaClientError>> for Error {
|
|
||||||
fn from(error: RumaResponseError<RumaClientError>) -> Self {
|
|
||||||
Self::RumaResponse(error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<RumaIntoHttpError> for Error {
|
|
||||||
fn from(error: RumaIntoHttpError) -> Self {
|
|
||||||
Self::IntoHttp(error)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,17 +14,26 @@
|
||||||
|
|
||||||
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
|
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
|
||||||
|
|
||||||
|
#[cfg(all(not(test), not(target_arch = "wasm32")))]
|
||||||
|
use backoff::{future::retry, Error as RetryError, ExponentialBackoff};
|
||||||
|
#[cfg(all(not(test), not(target_arch = "wasm32")))]
|
||||||
|
use http::StatusCode;
|
||||||
use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse};
|
use http::{HeaderValue, Method as HttpMethod, Response as HttpResponse};
|
||||||
use reqwest::{Client, Response};
|
use reqwest::{Client, Response};
|
||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use matrix_sdk_common::{
|
use matrix_sdk_common::{
|
||||||
api::r0::media::create_content, async_trait, locks::RwLock, AsyncTraitDeps, AuthScheme,
|
api::r0::media::create_content, async_trait, instant::Duration, locks::RwLock, AsyncTraitDeps,
|
||||||
FromHttpResponseError,
|
AuthScheme, FromHttpResponseError,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{ClientConfig, Error, OutgoingRequest, Result, Session};
|
use crate::{error::HttpError, ClientConfig, OutgoingRequest, Session};
|
||||||
|
|
||||||
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
|
const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
|
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
/// Abstraction around the http layer. The allows implementors to use different
|
/// Abstraction around the http layer. The allows implementors to use different
|
||||||
/// http libraries.
|
/// http libraries.
|
||||||
|
@ -43,7 +52,8 @@ pub trait HttpSend: AsyncTraitDeps {
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// use std::convert::TryFrom;
|
/// use std::convert::TryFrom;
|
||||||
/// use matrix_sdk::{HttpSend, Result, async_trait};
|
/// use matrix_sdk::{HttpSend, async_trait, HttpError};
|
||||||
|
/// # use std::time::Duration;
|
||||||
///
|
///
|
||||||
/// #[derive(Debug)]
|
/// #[derive(Debug)]
|
||||||
/// struct Client(reqwest::Client);
|
/// struct Client(reqwest::Client);
|
||||||
|
@ -52,7 +62,7 @@ pub trait HttpSend: AsyncTraitDeps {
|
||||||
/// async fn response_to_http_response(
|
/// async fn response_to_http_response(
|
||||||
/// &self,
|
/// &self,
|
||||||
/// mut response: reqwest::Response,
|
/// mut response: reqwest::Response,
|
||||||
/// ) -> Result<http::Response<Vec<u8>>> {
|
/// ) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||||
/// // Convert the reqwest response to a http one.
|
/// // Convert the reqwest response to a http one.
|
||||||
/// todo!()
|
/// todo!()
|
||||||
/// }
|
/// }
|
||||||
|
@ -60,7 +70,11 @@ pub trait HttpSend: AsyncTraitDeps {
|
||||||
///
|
///
|
||||||
/// #[async_trait]
|
/// #[async_trait]
|
||||||
/// impl HttpSend for Client {
|
/// impl HttpSend for Client {
|
||||||
/// async fn send_request(&self, request: http::Request<Vec<u8>>) -> Result<http::Response<Vec<u8>>> {
|
/// async fn send_request(
|
||||||
|
/// &self,
|
||||||
|
/// request: http::Request<Vec<u8>>,
|
||||||
|
/// timeout: Option<Duration>,
|
||||||
|
/// ) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||||
/// Ok(self
|
/// Ok(self
|
||||||
/// .response_to_http_response(
|
/// .response_to_http_response(
|
||||||
/// self.0
|
/// self.0
|
||||||
|
@ -74,7 +88,8 @@ pub trait HttpSend: AsyncTraitDeps {
|
||||||
async fn send_request(
|
async fn send_request(
|
||||||
&self,
|
&self,
|
||||||
request: http::Request<Vec<u8>>,
|
request: http::Request<Vec<u8>>,
|
||||||
) -> Result<http::Response<Vec<u8>>>;
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<http::Response<Vec<u8>>, HttpError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -90,7 +105,8 @@ impl HttpClient {
|
||||||
request: Request,
|
request: Request,
|
||||||
session: Arc<RwLock<Option<Session>>>,
|
session: Arc<RwLock<Option<Session>>>,
|
||||||
content_type: Option<HeaderValue>,
|
content_type: Option<HeaderValue>,
|
||||||
) -> Result<http::Response<Vec<u8>>> {
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||||
let mut request = {
|
let mut request = {
|
||||||
let read_guard;
|
let read_guard;
|
||||||
let access_token = match Request::METADATA.authentication {
|
let access_token = match Request::METADATA.authentication {
|
||||||
|
@ -100,11 +116,11 @@ impl HttpClient {
|
||||||
if let Some(session) = read_guard.as_ref() {
|
if let Some(session) = read_guard.as_ref() {
|
||||||
Some(session.access_token.as_str())
|
Some(session.access_token.as_str())
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::AuthenticationRequired);
|
return Err(HttpError::AuthenticationRequired);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
AuthScheme::None => None,
|
AuthScheme::None => None,
|
||||||
_ => return Err(Error::NotClientRequest),
|
_ => return Err(HttpError::NotClientRequest),
|
||||||
};
|
};
|
||||||
|
|
||||||
request.try_into_http_request(&self.homeserver.to_string(), access_token)?
|
request.try_into_http_request(&self.homeserver.to_string(), access_token)?
|
||||||
|
@ -118,44 +134,51 @@ impl HttpClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.inner.send_request(request).await
|
self.inner.send_request(request, timeout).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload(
|
pub async fn upload(
|
||||||
&self,
|
&self,
|
||||||
request: create_content::Request<'_>,
|
request: create_content::Request<'_>,
|
||||||
) -> Result<create_content::Response> {
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<create_content::Response, HttpError> {
|
||||||
let response = self
|
let response = self
|
||||||
.send_request(request, self.session.clone(), None)
|
.send_request(request, self.session.clone(), None, timeout)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(create_content::Response::try_from(response)?)
|
Ok(create_content::Response::try_from(response)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send<Request>(&self, request: Request) -> Result<Request::IncomingResponse>
|
pub async fn send<Request>(
|
||||||
|
&self,
|
||||||
|
request: Request,
|
||||||
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<Request::IncomingResponse, HttpError>
|
||||||
where
|
where
|
||||||
Request: OutgoingRequest,
|
Request: OutgoingRequest + Debug,
|
||||||
Error: From<FromHttpResponseError<Request::EndpointError>>,
|
HttpError: From<FromHttpResponseError<Request::EndpointError>>,
|
||||||
{
|
{
|
||||||
let content_type = HeaderValue::from_static("application/json");
|
let content_type = HeaderValue::from_static("application/json");
|
||||||
let response = self
|
let response = self
|
||||||
.send_request(request, self.session.clone(), Some(content_type))
|
.send_request(request, self.session.clone(), Some(content_type), timeout)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
trace!("Got response: {:?}", response);
|
trace!("Got response: {:?}", response);
|
||||||
|
|
||||||
Ok(Request::IncomingResponse::try_from(response)?)
|
let response = Request::IncomingResponse::try_from(response)?;
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build a client with the specified configuration.
|
/// Build a client with the specified configuration.
|
||||||
pub(crate) fn client_with_config(config: &ClientConfig) -> Result<Client> {
|
pub(crate) fn client_with_config(config: &ClientConfig) -> Result<Client, HttpError> {
|
||||||
let http_client = reqwest::Client::builder();
|
let http_client = reqwest::Client::builder();
|
||||||
|
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
let http_client = {
|
let http_client = {
|
||||||
let http_client = match config.timeout {
|
let http_client = match config.timeout {
|
||||||
Some(x) => http_client.timeout(x),
|
Some(x) => http_client.timeout(x),
|
||||||
None => http_client,
|
None => http_client.timeout(DEFAULT_REQUEST_TIMEOUT),
|
||||||
};
|
};
|
||||||
|
|
||||||
let http_client = if config.disable_ssl_verification {
|
let http_client = if config.disable_ssl_verification {
|
||||||
|
@ -173,12 +196,15 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result<Client> {
|
||||||
|
|
||||||
let user_agent = match &config.user_agent {
|
let user_agent = match &config.user_agent {
|
||||||
Some(a) => a.clone(),
|
Some(a) => a.clone(),
|
||||||
None => HeaderValue::from_str(&format!("matrix-rust-sdk {}", crate::VERSION)).unwrap(),
|
None => HeaderValue::from_str(&format!("matrix-rust-sdk {}", crate::VERSION))
|
||||||
|
.expect("Can't construct the version header"),
|
||||||
};
|
};
|
||||||
|
|
||||||
headers.insert(reqwest::header::USER_AGENT, user_agent);
|
headers.insert(reqwest::header::USER_AGENT, user_agent);
|
||||||
|
|
||||||
http_client.default_headers(headers)
|
http_client
|
||||||
|
.default_headers(headers)
|
||||||
|
.connect_timeout(DEFAULT_CONNECTION_TIMEOUT)
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
@ -188,11 +214,15 @@ pub(crate) fn client_with_config(config: &ClientConfig) -> Result<Client> {
|
||||||
Ok(http_client.build()?)
|
Ok(http_client.build()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn response_to_http_response(mut response: Response) -> Result<http::Response<Vec<u8>>> {
|
async fn response_to_http_response(
|
||||||
|
mut response: Response,
|
||||||
|
) -> Result<http::Response<Vec<u8>>, reqwest::Error> {
|
||||||
let status = response.status();
|
let status = response.status();
|
||||||
|
|
||||||
let mut http_builder = HttpResponse::builder().status(status);
|
let mut http_builder = HttpResponse::builder().status(status);
|
||||||
let headers = http_builder.headers_mut().unwrap();
|
let headers = http_builder
|
||||||
|
.headers_mut()
|
||||||
|
.expect("Can't get the response builder headers");
|
||||||
|
|
||||||
for (k, v) in response.headers_mut().drain() {
|
for (k, v) in response.headers_mut().drain() {
|
||||||
if let Some(key) = k {
|
if let Some(key) = k {
|
||||||
|
@ -202,7 +232,63 @@ async fn response_to_http_response(mut response: Response) -> Result<http::Respo
|
||||||
|
|
||||||
let body = response.bytes().await?.as_ref().to_owned();
|
let body = response.bytes().await?.as_ref().to_owned();
|
||||||
|
|
||||||
Ok(http_builder.body(body).unwrap())
|
Ok(http_builder
|
||||||
|
.body(body)
|
||||||
|
.expect("Can't construct a response using the given body"))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(any(test, target_arch = "wasm32"))]
|
||||||
|
async fn send_request(
|
||||||
|
client: &Client,
|
||||||
|
request: http::Request<Vec<u8>>,
|
||||||
|
_: Option<Duration>,
|
||||||
|
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||||
|
let request = reqwest::Request::try_from(request)?;
|
||||||
|
let response = client.execute(request).await?;
|
||||||
|
|
||||||
|
Ok(response_to_http_response(response).await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(not(test), not(target_arch = "wasm32")))]
|
||||||
|
async fn send_request(
|
||||||
|
client: &Client,
|
||||||
|
request: http::Request<Vec<u8>>,
|
||||||
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||||
|
let backoff = ExponentialBackoff::default();
|
||||||
|
let mut request = reqwest::Request::try_from(request)?;
|
||||||
|
|
||||||
|
if let Some(timeout) = timeout {
|
||||||
|
*request.timeout_mut() = Some(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
let request = &request;
|
||||||
|
|
||||||
|
let request = || async move {
|
||||||
|
let request = request.try_clone().ok_or(HttpError::UnableToCloneRequest)?;
|
||||||
|
|
||||||
|
let response = client
|
||||||
|
.execute(request)
|
||||||
|
.await
|
||||||
|
.map_err(|e| RetryError::Transient(HttpError::Reqwest(e)))?;
|
||||||
|
|
||||||
|
let status_code = response.status();
|
||||||
|
// TODO TOO_MANY_REQUESTS will have a retry timeout which we should
|
||||||
|
// use.
|
||||||
|
if status_code.is_server_error() || response.status() == StatusCode::TOO_MANY_REQUESTS {
|
||||||
|
return Err(RetryError::Transient(HttpError::Server(status_code)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = response_to_http_response(response)
|
||||||
|
.await
|
||||||
|
.map_err(|e| RetryError::Permanent(HttpError::Reqwest(e)))?;
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = retry(backoff, request).await?;
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||||
|
@ -211,10 +297,8 @@ impl HttpSend for Client {
|
||||||
async fn send_request(
|
async fn send_request(
|
||||||
&self,
|
&self,
|
||||||
request: http::Request<Vec<u8>>,
|
request: http::Request<Vec<u8>>,
|
||||||
) -> Result<http::Response<Vec<u8>>> {
|
timeout: Option<Duration>,
|
||||||
Ok(
|
) -> Result<http::Response<Vec<u8>>, HttpError> {
|
||||||
response_to_http_response(self.execute(reqwest::Request::try_from(request)?).await?)
|
send_request(&self, request, timeout).await
|
||||||
.await?,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ pub use client::{Client, ClientConfig, LoopCtrl, SyncSettings};
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
|
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
|
||||||
pub use device::Device;
|
pub use device::Device;
|
||||||
pub use error::{Error, Result};
|
pub use error::{Error, HttpError, Result};
|
||||||
pub use http_client::HttpSend;
|
pub use http_client::HttpSend;
|
||||||
#[cfg(feature = "encryption")]
|
#[cfg(feature = "encryption")]
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
|
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
|
||||||
|
|
|
@ -54,7 +54,7 @@ impl Sas {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(s) = signature {
|
if let Some(s) = signature {
|
||||||
self.client.send(s).await?;
|
self.client.send(s, None).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
Loading…
Reference in New Issue