From bbe812f1d96017d8dec7edcaf22f840357967228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 22 Mar 2021 20:23:15 +0100 Subject: [PATCH] common: Add a executor abstraction so we can spawn tasks under WASM --- matrix_sdk_common/Cargo.toml | 2 + matrix_sdk_common/src/executor.rs | 40 +++++++++++++++++++ matrix_sdk_common/src/lib.rs | 1 + matrix_sdk_crypto/Cargo.toml | 1 - matrix_sdk_crypto/src/identities/manager.rs | 10 +++-- .../src/session_manager/group_sessions.rs | 6 ++- 6 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 matrix_sdk_common/src/executor.rs diff --git a/matrix_sdk_common/Cargo.toml b/matrix_sdk_common/Cargo.toml index daab24e2..223c3658 100644 --- a/matrix_sdk_common/Cargo.toml +++ b/matrix_sdk_common/Cargo.toml @@ -34,5 +34,7 @@ default-features = false features = ["sync"] [target.'cfg(target_arch = "wasm32")'.dependencies] +futures = "0.3.12" futures-locks = { version = "0.6.0", default-features = false } +wasm-bindgen-futures = "0.4" uuid = { version = "0.8.2", default-features = false, features = ["v4", "wasm-bindgen"] } diff --git a/matrix_sdk_common/src/executor.rs b/matrix_sdk_common/src/executor.rs new file mode 100644 index 00000000..2d2cce52 --- /dev/null +++ b/matrix_sdk_common/src/executor.rs @@ -0,0 +1,40 @@ +#[cfg(target_arch = "wasm32")] +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +#[cfg(not(target_arch = "wasm32"))] +pub use tokio::spawn; + +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local; + +#[cfg(target_arch = "wasm32")] +use futures::{future::RemoteHandle, Future, FutureExt}; + +#[cfg(target_arch = "wasm32")] +pub fn spawn(future: F) -> JoinHandle +where + F: Future + 'static, +{ + let fut = future.unit_error(); + let (fut, handle) = fut.remote_handle(); + spawn_local(fut); + + JoinHandle { handle } +} + +#[cfg(target_arch = "wasm32")] +pub struct JoinHandle { + handle: RemoteHandle>, +} + +#[cfg(target_arch = "wasm32")] +impl Future for JoinHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.handle).poll(cx) + } +} diff --git a/matrix_sdk_common/src/lib.rs b/matrix_sdk_common/src/lib.rs index bb930356..dc4805f9 100644 --- a/matrix_sdk_common/src/lib.rs +++ b/matrix_sdk_common/src/lib.rs @@ -14,6 +14,7 @@ pub use ruma::{ pub use uuid; pub mod deserialized_responses; +pub mod executor; pub mod locks; /// Super trait that is used for our store traits, this trait will differ if diff --git a/matrix_sdk_crypto/Cargo.toml b/matrix_sdk_crypto/Cargo.toml index d560f15b..5767be09 100644 --- a/matrix_sdk_crypto/Cargo.toml +++ b/matrix_sdk_crypto/Cargo.toml @@ -29,7 +29,6 @@ serde_json = "1.0.61" zeroize = { version = "1.2.0", features = ["zeroize_derive"] } # Misc dependencies -tokio = { version = "1.1.0", default-features = false, features = ["rt", "rt-multi-thread"] } futures = "0.3.12" sled = { version = "0.34.6", optional = true } thiserror = "1.0.23" diff --git a/matrix_sdk_crypto/src/identities/manager.rs b/matrix_sdk_crypto/src/identities/manager.rs index 080a6432..5a1a9b58 100644 --- a/matrix_sdk_crypto/src/identities/manager.rs +++ b/matrix_sdk_crypto/src/identities/manager.rs @@ -23,6 +23,7 @@ use tracing::{trace, warn}; use matrix_sdk_common::{ api::r0::keys::get_keys::Response as KeysQueryResponse, encryption::DeviceKeys, + executor::spawn, identifiers::{DeviceIdBox, UserId}, }; @@ -160,7 +161,7 @@ impl IdentityManager { ); None } else { - Some(tokio::spawn(Self::update_or_create_device( + Some(spawn(Self::update_or_create_device( store.clone(), device_keys, ))) @@ -170,7 +171,9 @@ impl IdentityManager { let results = join_all(tasks).await; for device in results { - match device.expect("Creating or updating a device panicked")? { + let device = device.expect("Creating or updating a device panicked")?; + + match device { DeviceChange::New(d) => changes.new.push(d), DeviceChange::Updated(d) => changes.changed.push(d), DeviceChange::None => (), @@ -211,7 +214,7 @@ impl IdentityManager { let tasks = device_keys_map .into_iter() .map(|(user_id, device_keys_map)| { - tokio::spawn(Self::update_user_devices( + spawn(Self::update_user_devices( self.store.clone(), self.user_id.clone(), self.device_id.clone(), @@ -224,6 +227,7 @@ impl IdentityManager { for result in results { let change_fragment = result.expect("Panic while updating user devices")?; + changes.extend(change_fragment); } diff --git a/matrix_sdk_crypto/src/session_manager/group_sessions.rs b/matrix_sdk_crypto/src/session_manager/group_sessions.rs index 5a576fc3..68af665a 100644 --- a/matrix_sdk_crypto/src/session_manager/group_sessions.rs +++ b/matrix_sdk_crypto/src/session_manager/group_sessions.rs @@ -26,6 +26,7 @@ use matrix_sdk_common::{ room::{encrypted::EncryptedEventContent, history_visibility::HistoryVisibility}, AnyMessageEventContent, EventType, }, + executor::spawn, identifiers::{DeviceId, DeviceIdBox, RoomId, UserId}, uuid::Uuid, }; @@ -223,7 +224,7 @@ impl GroupSessionManager { let tasks: Vec<_> = devices .iter() - .map(|d| tokio::spawn(encrypt(d.clone(), content.clone()))) + .map(|d| spawn(encrypt(d.clone(), content.clone()))) .collect(); let results = join_all(tasks).await; @@ -478,7 +479,7 @@ impl GroupSessionManager { let tasks: Vec<_> = devices .chunks(Self::MAX_TO_DEVICE_MESSAGES) .map(|chunk| { - tokio::spawn(Self::encrypt_request( + spawn(Self::encrypt_request( chunk.to_vec(), key_content.clone(), outbound.clone(), @@ -490,6 +491,7 @@ impl GroupSessionManager { for result in join_all(tasks).await { let used_sessions: OlmResult> = result.expect("Encryption task paniced"); + changes.sessions.extend(used_sessions?); }