chore(sdk): Move the e2ee related client methods to the encryption module

master
Damir Jelić 2021-09-14 13:10:22 +02:00
parent e8c5b0766e
commit 29d11db73a
2 changed files with 720 additions and 724 deletions

View File

@ -13,8 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
use std::path::PathBuf;
use std::{
collections::BTreeMap,
fmt::{self, Debug},
@ -31,11 +29,6 @@ use std::{
io::{Error as IoError, ErrorKind as IoErrorKind},
ops::Range,
};
#[cfg(feature = "encryption")]
use std::{
collections::HashSet,
io::{Cursor, Write},
};
use dashmap::DashMap;
use futures::FutureExt;
@ -43,74 +36,20 @@ use futures_timer::Delay as sleep;
use http::HeaderValue;
#[cfg(feature = "sso_login")]
use http::Response;
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
use matrix_sdk_base::crypto::{decrypt_key_export, encrypt_key_export, olm::InboundGroupSession};
#[cfg(feature = "encryption")]
use matrix_sdk_base::crypto::{
store::CryptoStoreError, AttachmentDecryptor, OutgoingRequests, RoomMessageRequest,
ToDeviceRequest,
};
#[cfg(feature = "encryption")]
use matrix_sdk_base::{crypto::CrossSigningStatus, deserialized_responses::RoomEvent};
use matrix_sdk_base::{
deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse},
media::{MediaEventContent, MediaFormat, MediaRequest, MediaThumbnailSize, MediaType},
BaseClient, BaseClientConfig, Session, Store,
};
use mime::{self, Mime};
#[cfg(feature = "sso_login")]
use rand::{thread_rng, Rng};
use reqwest::header::InvalidHeaderValue;
#[cfg(feature = "encryption")]
use ruma::events::{AnyMessageEvent, AnyRoomEvent, AnySyncMessageEvent, EventType};
use ruma::{
api::{client::r0::push::get_notifications::Notification, SendAccessToken},
events::AnyMessageEventContent,
MxcUri,
};
use serde::de::DeserializeOwned;
#[cfg(feature = "sso_login")]
use tokio::{net::TcpListener, sync::oneshot};
#[cfg(feature = "sso_login")]
use tokio_stream::wrappers::TcpListenerStream;
#[cfg(feature = "encryption")]
use tracing::{debug, trace};
use tracing::{error, info, instrument, warn};
use url::Url;
#[cfg(feature = "sso_login")]
use warp::Filter;
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
use zeroize::Zeroizing;
/// Enum controlling if a loop running callbacks should continue or abort.
///
/// This is mainly used in the [`sync_with_callback`] method, the return value
/// of the provided callback controls if the sync loop should be exited.
///
/// [`sync_with_callback`]: #method.sync_with_callback
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LoopCtrl {
/// Continue running the loop.
Continue,
/// Break out of the loop.
Break,
}
use matrix_sdk_common::{
instant::{Duration, Instant},
locks::{Mutex, RwLock},
uuid::Uuid,
};
#[cfg(feature = "encryption")]
use ruma::{
api::client::r0::{
keys::{get_keys, upload_keys, upload_signing_keys::Request as UploadSigningKeysRequest},
to_device::send_event_to_device::{
Request as RumaToDeviceRequest, Response as ToDeviceResponse,
},
},
DeviceId,
};
use mime::{self, Mime};
#[cfg(feature = "sso_login")]
use rand::{thread_rng, Rng};
use reqwest::header::InvalidHeaderValue;
use ruma::{
api::{
client::{
@ -123,6 +62,7 @@ use ruma::{
membership::{join_room_by_id, join_room_by_id_or_alias},
message::send_message_event,
profile::{get_avatar_url, get_display_name, set_avatar_url, set_display_name},
push::get_notifications::Notification,
room::create_room,
session::{get_login_types, login, sso_login},
sync::sync_events,
@ -131,23 +71,23 @@ use ruma::{
unversioned::{discover_homeserver, get_supported_versions},
},
error::FromHttpResponseError,
OutgoingRequest,
OutgoingRequest, SendAccessToken,
},
assign,
events::AnyMessageEventContent,
presence::PresenceState,
DeviceIdBox, RoomId, RoomIdOrAliasId, ServerName, UInt, UserId,
DeviceIdBox, MxcUri, RoomId, RoomIdOrAliasId, ServerName, UInt, UserId,
};
use serde::de::DeserializeOwned;
#[cfg(feature = "sso_login")]
use tokio::{net::TcpListener, sync::oneshot};
#[cfg(feature = "sso_login")]
use tokio_stream::wrappers::TcpListenerStream;
use tracing::{error, info, instrument, warn};
use url::Url;
#[cfg(feature = "sso_login")]
use warp::Filter;
#[cfg(all(feature = "qrcode", feature = "encryption"))]
use crate::encryption::verification::QrVerification;
#[cfg(feature = "encryption")]
use crate::{
encryption::{
identities::{Device, UserDevices},
verification::{SasVerification, Verification, VerificationRequest},
},
error::RoomKeyImportError,
};
use crate::{
error::{HttpError, HttpResult},
event_handler::{EventHandler, EventHandlerData, EventHandlerResult, EventKind, SyncEvent},
@ -176,6 +116,20 @@ type NotificationHandlerFut = EventHandlerFut;
type NotificationHandlerFn =
Box<dyn Fn(Notification, room::Room, Client) -> NotificationHandlerFut + Send + Sync>;
/// Enum controlling if a loop running callbacks should continue or abort.
///
/// This is mainly used in the [`sync_with_callback`] method, the return value
/// of the provided callback controls if the sync loop should be exited.
///
/// [`sync_with_callback`]: #method.sync_with_callback
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LoopCtrl {
/// Continue running the loop.
Continue,
/// Break out of the loop.
Break,
}
/// An async/await enabled Matrix client.
///
/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
@ -193,7 +147,7 @@ pub struct Client {
pub(crate) group_session_locks: Arc<DashMap<RoomId, Arc<Mutex<()>>>>,
#[cfg(feature = "encryption")]
/// Lock making sure we're only doing one key claim request at a time.
key_claim_lock: Arc<Mutex<()>>,
pub(crate) key_claim_lock: Arc<Mutex<()>>,
pub(crate) members_request_locks: Arc<DashMap<RoomId, Arc<Mutex<()>>>>,
pub(crate) typing_notice_times: Arc<DashMap<RoomId, Instant>>,
/// Event handlers. See `register_event_handler`.
@ -209,7 +163,7 @@ pub struct Client {
/// synchronization, e.g. if we send out a request to create a room, we can
/// wait for the sync to get the data to fetch a room object from the state
/// store.
sync_beat: Arc<event_listener::Event>,
pub(crate) sync_beat: Arc<event_listener::Event>,
}
#[cfg(not(tarpaulin_include))]
@ -722,24 +676,6 @@ impl Client {
session.as_ref().map(|s| s.device_id.clone())
}
/// Get the public ed25519 key of our own device. This is usually what is
/// called the fingerprint of the device.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn ed25519_key(&self) -> Option<String> {
self.base_client.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned())
}
/// Get all the tracked users we know about
///
/// Tracked users are users for which we keep the device list of E2EE
/// capable devices up to date.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn tracked_users(&self) -> HashSet<UserId> {
self.base_client.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default()
}
/// Fetches the display name of the owner of the client.
///
/// # Example
@ -1125,40 +1061,6 @@ impl Client {
self.store().get_room(room_id).and_then(|room| room::Left::new(self.clone(), room))
}
/// Tries to decrypt a `AnyRoomEvent`. Returns unencrypted room event when
/// decryption fails.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub(crate) async fn decrypt_room_event(&self, event: &AnyRoomEvent) -> RoomEvent {
if let Some(machine) = self.base_client.olm_machine().await {
if let AnyRoomEvent::Message(event) = event {
if let AnyMessageEvent::RoomEncrypted(_) = event {
let room_id = event.room_id();
// Turn the AnyMessageEvent into a AnySyncMessageEvent
let event = event.clone().into();
if let AnySyncMessageEvent::RoomEncrypted(e) = event {
if let Ok(decrypted) = machine.decrypt_room_event(&e, room_id).await {
let event = decrypted
.event
.deserialize()
.unwrap()
.into_full_event(room_id.clone())
.into();
let encryption_info = decrypted.encryption_info;
// Return decrytped room event
return RoomEvent { event, encryption_info };
}
}
}
}
}
// Fallback to unencrypted room event
RoomEvent { event: event.into(), encryption_info: None }
}
/// Gets the homeservers supported login types.
///
/// This should be the first step when trying to login so you can call the
@ -1817,21 +1719,6 @@ impl Client {
self.send(request, None).await
}
#[cfg(feature = "encryption")]
pub(crate) async fn room_send_helper(
&self,
request: &RoomMessageRequest,
) -> Result<send_message_event::Response> {
let content = request.content.clone();
let txn_id = request.txn_id;
let room_id = &request.room_id;
self.get_joined_room(room_id)
.expect("Can't send a message to a room that isn't known to the store")
.send(content, Some(txn_id))
.await
}
/// Upload some media to the server.
///
/// # Arguments
@ -2002,22 +1889,6 @@ impl Client {
Ok(self.http_client.send(request, config).await?)
}
#[cfg(feature = "encryption")]
pub(crate) async fn send_to_device(
&self,
request: &ToDeviceRequest,
) -> HttpResult<ToDeviceResponse> {
let txn_id_string = request.txn_id_string();
let request = RumaToDeviceRequest::new_raw(
request.event_type.as_str(),
&txn_id_string,
request.messages.clone(),
);
self.send(request, None).await
}
/// Get information of all our own devices.
///
/// # Examples
@ -2326,6 +2197,8 @@ impl Client {
#[cfg(feature = "encryption")]
{
use matrix_sdk_base::crypto::OutgoingRequests;
// This is needed because sometimes we need to automatically
// claim some one-time keys to unwedge an existing Olm session.
if let Err(e) = self.claim_one_time_keys([].iter()).await {
@ -2417,554 +2290,12 @@ impl Client {
}
}
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
async fn send_account_data(
&self,
content: ruma::events::AnyGlobalAccountDataEventContent,
) -> Result<ruma::api::client::r0::config::set_global_account_data::Response> {
let own_user =
self.user_id().await.ok_or_else(|| Error::from(HttpError::AuthenticationRequired))?;
let data = serde_json::value::to_raw_value(&content)?;
let request = ruma::api::client::r0::config::set_global_account_data::Request::new(
&data,
ruma::events::EventContent::event_type(&content),
&own_user,
);
Ok(self.send(request, None).await?)
}
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub(crate) async fn create_dm_room(&self, user_id: UserId) -> Result<Option<room::Joined>> {
use ruma::{
api::client::r0::room::create_room::RoomPreset,
events::AnyGlobalAccountDataEventContent,
};
const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
// First we create the DM room, where we invite the user and tell the
// invitee that the room should be a DM.
let invite = &[user_id.clone()];
let request = assign!(
ruma::api::client::r0::room::create_room::Request::new(),
{
invite,
is_direct: true,
preset: Some(RoomPreset::TrustedPrivateChat),
}
);
let response = self.send(request, None).await?;
// Now we need to mark the room as a DM for ourselves, we fetch the
// existing `m.direct` event and append the room to the list of DMs we
// have with this user.
let mut content = self
.store()
.get_account_data_event(EventType::Direct)
.await?
.map(|e| e.deserialize())
.transpose()?
.and_then(|e| {
if let AnyGlobalAccountDataEventContent::Direct(c) = e.content() {
Some(c)
} else {
None
}
})
.unwrap_or_else(|| ruma::events::direct::DirectEventContent(BTreeMap::new()));
content.entry(user_id.to_owned()).or_default().push(response.room_id.to_owned());
// TODO We should probably save the fact that we need to send this out
// because otherwise we might end up in a state where we have a DM that
// isn't marked as one.
self.send_account_data(AnyGlobalAccountDataEventContent::Direct(content)).await?;
// If the room is already in our store, fetch it, otherwise wait for a
// sync to be done which should put the room into our store.
if let Some(room) = self.get_joined_room(&response.room_id) {
Ok(Some(room))
} else {
self.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME);
Ok(self.get_joined_room(&response.room_id))
}
}
/// Claim one-time keys creating new Olm sessions.
///
/// # Arguments
///
/// * `users` - The list of user/device pairs that we should claim keys for.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
#[instrument(skip(users))]
pub(crate) async fn claim_one_time_keys(
&self,
users: impl Iterator<Item = &UserId>,
) -> Result<()> {
let _lock = self.key_claim_lock.lock().await;
if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? {
let response = self.send(request, None).await?;
self.base_client.mark_request_as_sent(&request_id, &response).await?;
}
Ok(())
}
/// Upload the E2E encryption keys.
///
/// This uploads the long lived device keys as well as the required amount
/// of one-time keys.
///
/// # Panics
///
/// Panics if the client isn't logged in, or if no encryption keys need to
/// be uploaded.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
#[instrument]
async fn keys_upload(
&self,
request_id: &Uuid,
request: &upload_keys::Request,
) -> Result<upload_keys::Response> {
debug!(
"Uploading encryption keys device keys: {}, one-time-keys: {}",
request.device_keys.is_some(),
request.one_time_keys.as_ref().map_or(0, |k| k.len())
);
let response = self.send(request.clone(), None).await?;
self.base_client.mark_request_as_sent(request_id, &response).await?;
Ok(response)
}
/// Get the current, if any, sync token of the client.
/// This will be None if the client didn't sync at least once.
pub async fn sync_token(&self) -> Option<String> {
self.base_client.sync_token().await
}
/// Query the server for users device keys.
///
/// # Panics
///
/// Panics if no key query needs to be done.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
#[instrument]
async fn keys_query(
&self,
request_id: &Uuid,
device_keys: BTreeMap<UserId, Vec<DeviceIdBox>>,
) -> Result<get_keys::Response> {
let request = assign!(get_keys::Request::new(), { device_keys });
let response = self.send(request, None).await?;
self.base_client.mark_request_as_sent(request_id, &response).await?;
Ok(response)
}
/// Get a verification object with the given flow id.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
let olm = self.base_client.olm_machine().await?;
olm.get_verification(user_id, flow_id).map(|v| match v {
matrix_sdk_base::crypto::Verification::SasV1(s) => {
SasVerification { inner: s, client: self.clone() }.into()
}
#[cfg(feature = "qrcode")]
matrix_sdk_base::crypto::Verification::QrV1(qr) => {
QrVerification { inner: qr, client: self.clone() }.into()
}
})
}
/// Get a `VerificationRequest` object for the given user with the given
/// flow id.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_verification_request(
&self,
user_id: &UserId,
flow_id: impl AsRef<str>,
) -> Option<VerificationRequest> {
let olm = self.base_client.olm_machine().await?;
olm.get_verification_request(user_id, flow_id)
.map(|r| VerificationRequest { inner: r, client: self.clone() })
}
/// Get a specific device of a user.
///
/// # Arguments
///
/// * `user_id` - The unique id of the user that the device belongs to.
///
/// * `device_id` - The unique id of the device.
///
/// Returns a `Device` if one is found and the crypto store didn't throw an
/// error.
///
/// This will always return None if the client hasn't been logged in.
///
/// # Example
///
/// ```no_run
/// # use std::convert::TryFrom;
/// # use matrix_sdk::{Client, ruma::UserId};
/// # use url::Url;
/// # use futures::executor::block_on;
/// # let alice = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// let device = client.get_device(&alice, "DEVICEID".into())
/// .await
/// .unwrap()
/// .unwrap();
///
/// println!("{:?}", device.verified());
///
/// let verification = device.request_verification().await.unwrap();
/// # });
/// ```
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_device(
&self,
user_id: &UserId,
device_id: &DeviceId,
) -> StdResult<Option<Device>, CryptoStoreError> {
let device = self.base_client.get_device(user_id, device_id).await?;
Ok(device.map(|d| Device { inner: d, client: self.clone() }))
}
/// Get a E2EE identity of an user.
///
/// # Arguments
///
/// * `user_id` - The unique id of the user that the identity belongs to.
///
/// Returns a `UserIdentity` if one is found and the crypto store
/// didn't throw an error.
///
/// This will always return None if the client hasn't been logged in.
///
/// # Example
///
/// ```no_run
/// # use std::convert::TryFrom;
/// # use matrix_sdk::{Client, ruma::UserId};
/// # use url::Url;
/// # use futures::executor::block_on;
/// # let alice = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// let user = client.get_user_identity(&alice).await?;
///
/// if let Some(user) = user {
/// println!("{:?}", user.verified());
///
/// let verification = user.request_verification().await?;
/// }
/// # anyhow::Result::<()>::Ok(()) });
/// ```
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_user_identity(
&self,
user_id: &UserId,
) -> StdResult<Option<crate::encryption::identities::UserIdentity>, CryptoStoreError> {
use crate::encryption::identities::UserIdentity;
if let Some(olm) = self.base_client.olm_machine().await {
let identity = olm.get_identity(user_id).await?;
Ok(identity.map(|i| match i {
matrix_sdk_base::crypto::UserIdentities::Own(i) => {
UserIdentity::new_own(self.clone(), i)
}
matrix_sdk_base::crypto::UserIdentities::Other(i) => {
UserIdentity::new(self.clone(), i, self.get_dm_room(user_id))
}
}))
} else {
Ok(None)
}
}
/// Get the status of the private cross signing keys.
///
/// This can be used to check which private cross signing keys we have
/// stored locally.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
if let Some(machine) = self.base_client.olm_machine().await {
Some(machine.cross_signing_status().await)
} else {
None
}
}
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
fn get_dm_room(&self, user_id: &UserId) -> Option<room::Joined> {
let rooms = self.joined_rooms();
let room_pairs: Vec<_> =
rooms.iter().map(|r| (r.room_id().to_owned(), r.direct_target())).collect();
trace!(rooms =? room_pairs, "Finding direct room");
let room = rooms.into_iter().find(|r| r.direct_target().as_ref() == Some(user_id));
trace!(room =? room, "Found room");
room
}
/// Create and upload a new cross signing identity.
///
/// # Arguments
///
/// * `auth_data` - This request requires user interactive auth, the first
/// request needs to set this to `None` and will always fail with an
/// `UiaaResponse`. The response will contain information for the
/// interactive auth and the same request needs to be made but this time
/// with some `auth_data` provided.
///
/// # Examples
/// ```no_run
/// # use std::{convert::TryFrom, collections::BTreeMap};
/// # use matrix_sdk::{
/// # ruma::{api::client::r0::uiaa, assign, UserId},
/// # Client,
/// # };
/// # use url::Url;
/// # use futures::executor::block_on;
/// # use serde_json::json;
/// # let user_id = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// if let Err(e) = client.bootstrap_cross_signing(None).await {
/// if let Some(response) = e.uiaa_response() {
/// let auth_data = uiaa::AuthData::Password(assign!(
/// uiaa::Password::new(uiaa::UserIdentifier::MatrixId("example"), "wordpass"),
/// { session: response.session.as_deref() }
/// ));
///
/// client
/// .bootstrap_cross_signing(Some(auth_data))
/// .await
/// .expect("Couldn't bootstrap cross signing")
/// } else {
/// panic!("Error durign cross signing bootstrap {:#?}", e);
/// }
/// }
/// # })
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData<'_>>) -> Result<()> {
let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
let (request, signature_request) = olm.bootstrap_cross_signing(false).await?;
let request = assign!(UploadSigningKeysRequest::new(), {
auth: auth_data,
master_key: request.master_key,
self_signing_key: request.self_signing_key,
user_signing_key: request.user_signing_key,
});
self.send(request, None).await?;
self.send(signature_request, None).await?;
Ok(())
}
/// Get a map holding all the devices of an user.
///
/// This will always return an empty map if the client hasn't been logged
/// in.
///
/// # Arguments
///
/// * `user_id` - The unique id of the user that the devices belong to.
///
/// # Example
///
/// ```no_run
/// # use std::convert::TryFrom;
/// # use matrix_sdk::{Client, ruma::UserId};
/// # use url::Url;
/// # use futures::executor::block_on;
/// # let alice = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// let devices = client.get_user_devices(&alice).await.unwrap();
///
/// for device in devices.devices() {
/// println!("{:?}", device);
/// }
/// # });
/// ```
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_user_devices(
&self,
user_id: &UserId,
) -> StdResult<UserDevices, CryptoStoreError> {
let devices = self.base_client.get_user_devices(user_id).await?;
Ok(UserDevices { inner: devices, client: self.clone() })
}
/// Export E2EE keys that match the given predicate encrypting them with the
/// given passphrase.
///
/// # Arguments
///
/// * `path` - The file path where the exported key file will be saved.
///
/// * `passphrase` - The passphrase that will be used to encrypt the
/// exported
/// room keys.
///
/// * `predicate` - A closure that will be called for every known
/// `InboundGroupSession`, which represents a room key. If the closure
/// returns `true` the `InboundGroupSessoin` will be included in the export,
/// if the closure returns `false` it will not be included.
///
/// # Panics
///
/// This method will panic if it isn't run on a Tokio runtime.
///
/// This method will panic if it can't get enough randomness from the OS to
/// encrypt the exported keys securely.
///
/// # Examples
///
/// ```no_run
/// # use std::{path::PathBuf, time::Duration};
/// # use matrix_sdk::{
/// # Client, SyncSettings,
/// # ruma::room_id,
/// # };
/// # use futures::executor::block_on;
/// # use url::Url;
/// # block_on(async {
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
/// # let mut client = Client::new(homeserver).unwrap();
/// let path = PathBuf::from("/home/example/e2e-keys.txt");
/// // Export all room keys.
/// client
/// .export_keys(path, "secret-passphrase", |_| true)
/// .await
/// .expect("Can't export keys.");
///
/// // Export only the room keys for a certain room.
/// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
/// let room_id = room_id!("!test:localhost");
///
/// client
/// .export_keys(path, "secret-passphrase", |s| s.room_id() == &room_id)
/// .await
/// .expect("Can't export keys.");
/// # });
/// ```
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
#[cfg_attr(feature = "docs", doc(cfg(all(encryption, not(target_arch = "wasm32")))))]
pub async fn export_keys(
&self,
path: PathBuf,
passphrase: &str,
predicate: impl FnMut(&InboundGroupSession) -> bool,
) -> Result<()> {
let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
let keys = olm.export_keys(predicate).await?;
let passphrase = Zeroizing::new(passphrase.to_owned());
let encrypt = move || -> Result<()> {
let export: String = encrypt_key_export(&keys, &passphrase, 500_000)?;
let mut file = std::fs::File::create(path)?;
file.write_all(&export.into_bytes())?;
Ok(())
};
let task = tokio::task::spawn_blocking(encrypt);
task.await.expect("Task join error")
}
/// Import E2EE keys from the given file path.
///
/// # Arguments
///
/// * `path` - The file path where the exported key file will can be found.
///
/// * `passphrase` - The passphrase that should be used to decrypt the
/// exported room keys.
///
/// Returns a tuple of numbers that represent the number of sessions that
/// were imported and the total number of sessions that were found in the
/// key export.
///
/// # Panics
///
/// This method will panic if it isn't run on a Tokio runtime.
///
/// ```no_run
/// # use std::{path::PathBuf, time::Duration};
/// # use matrix_sdk::{
/// # Client, SyncSettings,
/// # ruma::room_id,
/// # };
/// # use futures::executor::block_on;
/// # use url::Url;
/// # block_on(async {
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
/// # let mut client = Client::new(homeserver).unwrap();
/// let path = PathBuf::from("/home/example/e2e-keys.txt");
/// client
/// .import_keys(path, "secret-passphrase")
/// .await
/// .expect("Can't import keys");
/// # });
/// ```
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
#[cfg_attr(feature = "docs", doc(cfg(all(encryption, not(target_arch = "wasm32")))))]
pub async fn import_keys(
&self,
path: PathBuf,
passphrase: &str,
) -> StdResult<(usize, usize), RoomKeyImportError> {
let olm = self.base_client.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?;
let passphrase = Zeroizing::new(passphrase.to_owned());
let decrypt = move || {
let file = std::fs::File::open(path)?;
decrypt_key_export(file, &passphrase)
};
let task = tokio::task::spawn_blocking(decrypt);
let import = task.await.expect("Task join error")?;
Ok(olm.import_keys(import, |_, _| {}).await?)
}
/// Get a media file's content.
///
/// If the content is encrypted and encryption is enabled, the content will
@ -2996,9 +2327,11 @@ impl Client {
#[cfg(feature = "encryption")]
let content = {
let mut cursor = Cursor::new(content);
let mut reader =
AttachmentDecryptor::new(&mut cursor, file.as_ref().clone().into())?;
let mut cursor = std::io::Cursor::new(content);
let mut reader = matrix_sdk_base::crypto::AttachmentDecryptor::new(
&mut cursor,
file.as_ref().clone().into(),
)?;
let mut decrypted = Vec::new();
reader.read_to_end(&mut decrypted)?;
@ -3168,23 +2501,6 @@ impl Client {
let request = whoami::Request::new();
self.send(request, None).await
}
#[cfg(feature = "encryption")]
pub(crate) async fn send_verification_request(
&self,
request: matrix_sdk_base::crypto::OutgoingVerificationRequest,
) -> Result<()> {
match request {
matrix_sdk_base::crypto::OutgoingVerificationRequest::ToDevice(t) => {
self.send_to_device(&t).await?;
}
matrix_sdk_base::crypto::OutgoingVerificationRequest::InRoom(r) => {
self.room_send_helper(&r).await?;
}
}
Ok(())
}
}
#[cfg(test)]

View File

@ -16,4 +16,684 @@
pub mod identities;
pub mod verification;
use std::{
collections::{BTreeMap, HashSet},
io::Write,
path::PathBuf,
result::Result as StdResult,
};
pub use matrix_sdk_base::crypto::{EncryptionInfo, LocalTrust};
use matrix_sdk_base::{
crypto::{store::CryptoStoreError, CrossSigningStatus, RoomMessageRequest, ToDeviceRequest},
deserialized_responses::RoomEvent,
};
use matrix_sdk_common::{instant::Duration, uuid::Uuid};
use ruma::{
api::client::r0::{
keys::{get_keys, upload_keys, upload_signing_keys::Request as UploadSigningKeysRequest},
message::send_message_event,
to_device::send_event_to_device::{
Request as RumaToDeviceRequest, Response as ToDeviceResponse,
},
uiaa::AuthData,
},
assign,
events::{AnyMessageEvent, AnyRoomEvent, AnySyncMessageEvent, EventType},
DeviceId, DeviceIdBox, UserId,
};
use tracing::{debug, instrument, trace};
use crate::{
encryption::{
identities::{Device, UserDevices},
verification::{SasVerification, Verification, VerificationRequest},
},
error::{HttpError, HttpResult, RoomKeyImportError},
room, Client, Error, Result,
};
impl Client {
/// Get the public ed25519 key of our own device. This is usually what is
/// called the fingerprint of the device.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn ed25519_key(&self) -> Option<String> {
self.base_client.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned())
}
/// Get the status of the private cross signing keys.
///
/// This can be used to check which private cross signing keys we have
/// stored locally.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
if let Some(machine) = self.base_client.olm_machine().await {
Some(machine.cross_signing_status().await)
} else {
None
}
}
/// Get all the tracked users we know about
///
/// Tracked users are users for which we keep the device list of E2EE
/// capable devices up to date.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn tracked_users(&self) -> HashSet<UserId> {
self.base_client.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default()
}
/// Get a verification object with the given flow id.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
let olm = self.base_client.olm_machine().await?;
olm.get_verification(user_id, flow_id).map(|v| match v {
matrix_sdk_base::crypto::Verification::SasV1(s) => {
SasVerification { inner: s, client: self.clone() }.into()
}
#[cfg(feature = "qrcode")]
matrix_sdk_base::crypto::Verification::QrV1(qr) => {
verification::QrVerification { inner: qr, client: self.clone() }.into()
}
})
}
/// Get a `VerificationRequest` object for the given user with the given
/// flow id.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_verification_request(
&self,
user_id: &UserId,
flow_id: impl AsRef<str>,
) -> Option<VerificationRequest> {
let olm = self.base_client.olm_machine().await?;
olm.get_verification_request(user_id, flow_id)
.map(|r| VerificationRequest { inner: r, client: self.clone() })
}
/// Get a specific device of a user.
///
/// # Arguments
///
/// * `user_id` - The unique id of the user that the device belongs to.
///
/// * `device_id` - The unique id of the device.
///
/// Returns a `Device` if one is found and the crypto store didn't throw an
/// error.
///
/// This will always return None if the client hasn't been logged in.
///
/// # Example
///
/// ```no_run
/// # use std::convert::TryFrom;
/// # use matrix_sdk::{Client, ruma::UserId};
/// # use url::Url;
/// # use futures::executor::block_on;
/// # let alice = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// let device = client.get_device(&alice, "DEVICEID".into())
/// .await
/// .unwrap()
/// .unwrap();
///
/// println!("{:?}", device.verified());
///
/// let verification = device.request_verification().await.unwrap();
/// # });
/// ```
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_device(
&self,
user_id: &UserId,
device_id: &DeviceId,
) -> StdResult<Option<Device>, CryptoStoreError> {
let device = self.base_client.get_device(user_id, device_id).await?;
Ok(device.map(|d| Device { inner: d, client: self.clone() }))
}
/// Get a map holding all the devices of an user.
///
/// This will always return an empty map if the client hasn't been logged
/// in.
///
/// # Arguments
///
/// * `user_id` - The unique id of the user that the devices belong to.
///
/// # Example
///
/// ```no_run
/// # use std::convert::TryFrom;
/// # use matrix_sdk::{Client, ruma::UserId};
/// # use url::Url;
/// # use futures::executor::block_on;
/// # let alice = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// let devices = client.get_user_devices(&alice).await.unwrap();
///
/// for device in devices.devices() {
/// println!("{:?}", device);
/// }
/// # });
/// ```
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_user_devices(
&self,
user_id: &UserId,
) -> StdResult<UserDevices, CryptoStoreError> {
let devices = self.base_client.get_user_devices(user_id).await?;
Ok(UserDevices { inner: devices, client: self.clone() })
}
/// Get a E2EE identity of an user.
///
/// # Arguments
///
/// * `user_id` - The unique id of the user that the identity belongs to.
///
/// Returns a `UserIdentity` if one is found and the crypto store
/// didn't throw an error.
///
/// This will always return None if the client hasn't been logged in.
///
/// # Example
///
/// ```no_run
/// # use std::convert::TryFrom;
/// # use matrix_sdk::{Client, ruma::UserId};
/// # use url::Url;
/// # use futures::executor::block_on;
/// # let alice = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// let user = client.get_user_identity(&alice).await?;
///
/// if let Some(user) = user {
/// println!("{:?}", user.verified());
///
/// let verification = user.request_verification().await?;
/// }
/// # anyhow::Result::<()>::Ok(()) });
/// ```
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn get_user_identity(
&self,
user_id: &UserId,
) -> StdResult<Option<crate::encryption::identities::UserIdentity>, CryptoStoreError> {
use crate::encryption::identities::UserIdentity;
if let Some(olm) = self.base_client.olm_machine().await {
let identity = olm.get_identity(user_id).await?;
Ok(identity.map(|i| match i {
matrix_sdk_base::crypto::UserIdentities::Own(i) => {
UserIdentity::new_own(self.clone(), i)
}
matrix_sdk_base::crypto::UserIdentities::Other(i) => {
UserIdentity::new(self.clone(), i, self.get_dm_room(user_id))
}
}))
} else {
Ok(None)
}
}
/// Create and upload a new cross signing identity.
///
/// # Arguments
///
/// * `auth_data` - This request requires user interactive auth, the first
/// request needs to set this to `None` and will always fail with an
/// `UiaaResponse`. The response will contain information for the
/// interactive auth and the same request needs to be made but this time
/// with some `auth_data` provided.
///
/// # Examples
/// ```no_run
/// # use std::{convert::TryFrom, collections::BTreeMap};
/// # use matrix_sdk::{
/// # ruma::{api::client::r0::uiaa, assign, UserId},
/// # Client,
/// # };
/// # use url::Url;
/// # use futures::executor::block_on;
/// # use serde_json::json;
/// # let user_id = UserId::try_from("@alice:example.org").unwrap();
/// # let homeserver = Url::parse("http://example.com").unwrap();
/// # let client = Client::new(homeserver).unwrap();
/// # block_on(async {
/// if let Err(e) = client.bootstrap_cross_signing(None).await {
/// if let Some(response) = e.uiaa_response() {
/// let auth_data = uiaa::AuthData::Password(assign!(
/// uiaa::Password::new(uiaa::UserIdentifier::MatrixId("example"), "wordpass"),
/// { session: response.session.as_deref() }
/// ));
///
/// client
/// .bootstrap_cross_signing(Some(auth_data))
/// .await
/// .expect("Couldn't bootstrap cross signing")
/// } else {
/// panic!("Error durign cross signing bootstrap {:#?}", e);
/// }
/// }
/// # })
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData<'_>>) -> Result<()> {
let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
let (request, signature_request) = olm.bootstrap_cross_signing(false).await?;
let request = assign!(UploadSigningKeysRequest::new(), {
auth: auth_data,
master_key: request.master_key,
self_signing_key: request.self_signing_key,
user_signing_key: request.user_signing_key,
});
self.send(request, None).await?;
self.send(signature_request, None).await?;
Ok(())
}
/// Export E2EE keys that match the given predicate encrypting them with the
/// given passphrase.
///
/// # Arguments
///
/// * `path` - The file path where the exported key file will be saved.
///
/// * `passphrase` - The passphrase that will be used to encrypt the
/// exported
/// room keys.
///
/// * `predicate` - A closure that will be called for every known
/// `InboundGroupSession`, which represents a room key. If the closure
/// returns `true` the `InboundGroupSessoin` will be included in the export,
/// if the closure returns `false` it will not be included.
///
/// # Panics
///
/// This method will panic if it isn't run on a Tokio runtime.
///
/// This method will panic if it can't get enough randomness from the OS to
/// encrypt the exported keys securely.
///
/// # Examples
///
/// ```no_run
/// # use std::{path::PathBuf, time::Duration};
/// # use matrix_sdk::{
/// # Client, SyncSettings,
/// # ruma::room_id,
/// # };
/// # use futures::executor::block_on;
/// # use url::Url;
/// # block_on(async {
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
/// # let mut client = Client::new(homeserver).unwrap();
/// let path = PathBuf::from("/home/example/e2e-keys.txt");
/// // Export all room keys.
/// client
/// .export_keys(path, "secret-passphrase", |_| true)
/// .await
/// .expect("Can't export keys.");
///
/// // Export only the room keys for a certain room.
/// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
/// let room_id = room_id!("!test:localhost");
///
/// client
/// .export_keys(path, "secret-passphrase", |s| s.room_id() == &room_id)
/// .await
/// .expect("Can't export keys.");
/// # });
/// ```
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
#[cfg_attr(feature = "docs", doc(cfg(all(encryption, not(target_arch = "wasm32")))))]
pub async fn export_keys(
&self,
path: PathBuf,
passphrase: &str,
predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
) -> Result<()> {
let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
let keys = olm.export_keys(predicate).await?;
let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
let encrypt = move || -> Result<()> {
let export: String =
matrix_sdk_base::crypto::encrypt_key_export(&keys, &passphrase, 500_000)?;
let mut file = std::fs::File::create(path)?;
file.write_all(&export.into_bytes())?;
Ok(())
};
let task = tokio::task::spawn_blocking(encrypt);
task.await.expect("Task join error")
}
/// Import E2EE keys from the given file path.
///
/// # Arguments
///
/// * `path` - The file path where the exported key file will can be found.
///
/// * `passphrase` - The passphrase that should be used to decrypt the
/// exported room keys.
///
/// Returns a tuple of numbers that represent the number of sessions that
/// were imported and the total number of sessions that were found in the
/// key export.
///
/// # Panics
///
/// This method will panic if it isn't run on a Tokio runtime.
///
/// ```no_run
/// # use std::{path::PathBuf, time::Duration};
/// # use matrix_sdk::{
/// # Client, SyncSettings,
/// # ruma::room_id,
/// # };
/// # use futures::executor::block_on;
/// # use url::Url;
/// # block_on(async {
/// # let homeserver = Url::parse("http://localhost:8080").unwrap();
/// # let mut client = Client::new(homeserver).unwrap();
/// let path = PathBuf::from("/home/example/e2e-keys.txt");
/// client
/// .import_keys(path, "secret-passphrase")
/// .await
/// .expect("Can't import keys");
/// # });
/// ```
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
#[cfg_attr(feature = "docs", doc(cfg(all(encryption, not(target_arch = "wasm32")))))]
pub async fn import_keys(
&self,
path: PathBuf,
passphrase: &str,
) -> StdResult<(usize, usize), RoomKeyImportError> {
let olm = self.base_client.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?;
let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
let decrypt = move || {
let file = std::fs::File::open(path)?;
matrix_sdk_base::crypto::decrypt_key_export(file, &passphrase)
};
let task = tokio::task::spawn_blocking(decrypt);
let import = task.await.expect("Task join error")?;
Ok(olm.import_keys(import, |_, _| {}).await?)
}
/// Tries to decrypt a `AnyRoomEvent`. Returns unencrypted room event when
/// decryption fails.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub(crate) async fn decrypt_room_event(&self, event: &AnyRoomEvent) -> RoomEvent {
if let Some(machine) = self.base_client.olm_machine().await {
if let AnyRoomEvent::Message(event) = event {
if let AnyMessageEvent::RoomEncrypted(_) = event {
let room_id = event.room_id();
// Turn the AnyMessageEvent into a AnySyncMessageEvent
let event = event.clone().into();
if let AnySyncMessageEvent::RoomEncrypted(e) = event {
if let Ok(decrypted) = machine.decrypt_room_event(&e, room_id).await {
let event = decrypted
.event
.deserialize()
.unwrap()
.into_full_event(room_id.clone())
.into();
let encryption_info = decrypted.encryption_info;
// Return decrytped room event
return RoomEvent { event, encryption_info };
}
}
}
}
}
// Fallback to unencrypted room event
RoomEvent { event: event.into(), encryption_info: None }
}
/// Query the server for users device keys.
///
/// # Panics
///
/// Panics if no key query needs to be done.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
#[instrument]
pub(crate) async fn keys_query(
&self,
request_id: &Uuid,
device_keys: BTreeMap<UserId, Vec<DeviceIdBox>>,
) -> Result<get_keys::Response> {
let request = assign!(get_keys::Request::new(), { device_keys });
let response = self.send(request, None).await?;
self.base_client.mark_request_as_sent(request_id, &response).await?;
Ok(response)
}
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
async fn send_account_data(
&self,
content: ruma::events::AnyGlobalAccountDataEventContent,
) -> Result<ruma::api::client::r0::config::set_global_account_data::Response> {
let own_user =
self.user_id().await.ok_or_else(|| Error::from(HttpError::AuthenticationRequired))?;
let data = serde_json::value::to_raw_value(&content)?;
let request = ruma::api::client::r0::config::set_global_account_data::Request::new(
&data,
ruma::events::EventContent::event_type(&content),
&own_user,
);
Ok(self.send(request, None).await?)
}
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
pub(crate) async fn create_dm_room(&self, user_id: UserId) -> Result<Option<room::Joined>> {
use ruma::{
api::client::r0::room::create_room::RoomPreset,
events::AnyGlobalAccountDataEventContent,
};
const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
// First we create the DM room, where we invite the user and tell the
// invitee that the room should be a DM.
let invite = &[user_id.clone()];
let request = assign!(
ruma::api::client::r0::room::create_room::Request::new(),
{
invite,
is_direct: true,
preset: Some(RoomPreset::TrustedPrivateChat),
}
);
let response = self.send(request, None).await?;
// Now we need to mark the room as a DM for ourselves, we fetch the
// existing `m.direct` event and append the room to the list of DMs we
// have with this user.
let mut content = self
.store()
.get_account_data_event(EventType::Direct)
.await?
.map(|e| e.deserialize())
.transpose()?
.and_then(|e| {
if let AnyGlobalAccountDataEventContent::Direct(c) = e.content() {
Some(c)
} else {
None
}
})
.unwrap_or_else(|| ruma::events::direct::DirectEventContent(BTreeMap::new()));
content.entry(user_id.to_owned()).or_default().push(response.room_id.to_owned());
// TODO We should probably save the fact that we need to send this out
// because otherwise we might end up in a state where we have a DM that
// isn't marked as one.
self.send_account_data(AnyGlobalAccountDataEventContent::Direct(content)).await?;
// If the room is already in our store, fetch it, otherwise wait for a
// sync to be done which should put the room into our store.
if let Some(room) = self.get_joined_room(&response.room_id) {
Ok(Some(room))
} else {
self.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME);
Ok(self.get_joined_room(&response.room_id))
}
}
/// Claim one-time keys creating new Olm sessions.
///
/// # Arguments
///
/// * `users` - The list of user/device pairs that we should claim keys for.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
#[instrument(skip(users))]
pub(crate) async fn claim_one_time_keys(
&self,
users: impl Iterator<Item = &UserId>,
) -> Result<()> {
let _lock = self.key_claim_lock.lock().await;
if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? {
let response = self.send(request, None).await?;
self.base_client.mark_request_as_sent(&request_id, &response).await?;
}
Ok(())
}
/// Upload the E2E encryption keys.
///
/// This uploads the long lived device keys as well as the required amount
/// of one-time keys.
///
/// # Panics
///
/// Panics if the client isn't logged in, or if no encryption keys need to
/// be uploaded.
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
#[instrument]
pub(crate) async fn keys_upload(
&self,
request_id: &Uuid,
request: &upload_keys::Request,
) -> Result<upload_keys::Response> {
debug!(
"Uploading encryption keys device keys: {}, one-time-keys: {}",
request.device_keys.is_some(),
request.one_time_keys.as_ref().map_or(0, |k| k.len())
);
let response = self.send(request.clone(), None).await?;
self.base_client.mark_request_as_sent(request_id, &response).await?;
Ok(response)
}
#[cfg(feature = "encryption")]
pub(crate) async fn room_send_helper(
&self,
request: &RoomMessageRequest,
) -> Result<send_message_event::Response> {
let content = request.content.clone();
let txn_id = request.txn_id;
let room_id = &request.room_id;
self.get_joined_room(room_id)
.expect("Can't send a message to a room that isn't known to the store")
.send(content, Some(txn_id))
.await
}
#[cfg(feature = "encryption")]
pub(crate) async fn send_to_device(
&self,
request: &ToDeviceRequest,
) -> HttpResult<ToDeviceResponse> {
let txn_id_string = request.txn_id_string();
let request = RumaToDeviceRequest::new_raw(
request.event_type.as_str(),
&txn_id_string,
request.messages.clone(),
);
self.send(request, None).await
}
#[cfg(feature = "encryption")]
pub(crate) async fn send_verification_request(
&self,
request: matrix_sdk_base::crypto::OutgoingVerificationRequest,
) -> Result<()> {
match request {
matrix_sdk_base::crypto::OutgoingVerificationRequest::ToDevice(t) => {
self.send_to_device(&t).await?;
}
matrix_sdk_base::crypto::OutgoingVerificationRequest::InRoom(r) => {
self.room_send_helper(&r).await?;
}
}
Ok(())
}
#[cfg(feature = "encryption")]
#[cfg_attr(feature = "docs", doc(cfg(encryption)))]
fn get_dm_room(&self, user_id: &UserId) -> Option<room::Joined> {
let rooms = self.joined_rooms();
let room_pairs: Vec<_> =
rooms.iter().map(|r| (r.room_id().to_owned(), r.direct_target())).collect();
trace!(rooms =? room_pairs, "Finding direct room");
let room = rooms.into_iter().find(|r| r.direct_target().as_ref() == Some(user_id));
trace!(room =? room, "Found room");
room
}
}