common: Add a executor abstraction so we can spawn tasks under WASM

master
Damir Jelić 2021-03-22 20:23:15 +01:00
parent 75ac29540d
commit bbe812f1d9
6 changed files with 54 additions and 6 deletions

View File

@ -34,5 +34,7 @@ default-features = false
features = ["sync"] features = ["sync"]
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
futures = "0.3.12"
futures-locks = { version = "0.6.0", default-features = false } futures-locks = { version = "0.6.0", default-features = false }
wasm-bindgen-futures = "0.4"
uuid = { version = "0.8.2", default-features = false, features = ["v4", "wasm-bindgen"] } uuid = { version = "0.8.2", default-features = false, features = ["v4", "wasm-bindgen"] }

View File

@ -0,0 +1,40 @@
#[cfg(target_arch = "wasm32")]
use std::{
pin::Pin,
task::{Context, Poll},
};
#[cfg(not(target_arch = "wasm32"))]
pub use tokio::spawn;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local;
#[cfg(target_arch = "wasm32")]
use futures::{future::RemoteHandle, Future, FutureExt};
#[cfg(target_arch = "wasm32")]
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + 'static,
{
let fut = future.unit_error();
let (fut, handle) = fut.remote_handle();
spawn_local(fut);
JoinHandle { handle }
}
#[cfg(target_arch = "wasm32")]
pub struct JoinHandle<T> {
handle: RemoteHandle<Result<T, ()>>,
}
#[cfg(target_arch = "wasm32")]
impl<T: 'static> Future for JoinHandle<T> {
type Output = Result<T, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle).poll(cx)
}
}

View File

@ -14,6 +14,7 @@ pub use ruma::{
pub use uuid; pub use uuid;
pub mod deserialized_responses; pub mod deserialized_responses;
pub mod executor;
pub mod locks; pub mod locks;
/// Super trait that is used for our store traits, this trait will differ if /// Super trait that is used for our store traits, this trait will differ if

View File

@ -29,7 +29,6 @@ serde_json = "1.0.61"
zeroize = { version = "1.2.0", features = ["zeroize_derive"] } zeroize = { version = "1.2.0", features = ["zeroize_derive"] }
# Misc dependencies # Misc dependencies
tokio = { version = "1.1.0", default-features = false, features = ["rt", "rt-multi-thread"] }
futures = "0.3.12" futures = "0.3.12"
sled = { version = "0.34.6", optional = true } sled = { version = "0.34.6", optional = true }
thiserror = "1.0.23" thiserror = "1.0.23"

View File

@ -23,6 +23,7 @@ use tracing::{trace, warn};
use matrix_sdk_common::{ use matrix_sdk_common::{
api::r0::keys::get_keys::Response as KeysQueryResponse, api::r0::keys::get_keys::Response as KeysQueryResponse,
encryption::DeviceKeys, encryption::DeviceKeys,
executor::spawn,
identifiers::{DeviceIdBox, UserId}, identifiers::{DeviceIdBox, UserId},
}; };
@ -160,7 +161,7 @@ impl IdentityManager {
); );
None None
} else { } else {
Some(tokio::spawn(Self::update_or_create_device( Some(spawn(Self::update_or_create_device(
store.clone(), store.clone(),
device_keys, device_keys,
))) )))
@ -170,7 +171,9 @@ impl IdentityManager {
let results = join_all(tasks).await; let results = join_all(tasks).await;
for device in results { for device in results {
match device.expect("Creating or updating a device panicked")? { let device = device.expect("Creating or updating a device panicked")?;
match device {
DeviceChange::New(d) => changes.new.push(d), DeviceChange::New(d) => changes.new.push(d),
DeviceChange::Updated(d) => changes.changed.push(d), DeviceChange::Updated(d) => changes.changed.push(d),
DeviceChange::None => (), DeviceChange::None => (),
@ -211,7 +214,7 @@ impl IdentityManager {
let tasks = device_keys_map let tasks = device_keys_map
.into_iter() .into_iter()
.map(|(user_id, device_keys_map)| { .map(|(user_id, device_keys_map)| {
tokio::spawn(Self::update_user_devices( spawn(Self::update_user_devices(
self.store.clone(), self.store.clone(),
self.user_id.clone(), self.user_id.clone(),
self.device_id.clone(), self.device_id.clone(),
@ -224,6 +227,7 @@ impl IdentityManager {
for result in results { for result in results {
let change_fragment = result.expect("Panic while updating user devices")?; let change_fragment = result.expect("Panic while updating user devices")?;
changes.extend(change_fragment); changes.extend(change_fragment);
} }

View File

@ -26,6 +26,7 @@ use matrix_sdk_common::{
room::{encrypted::EncryptedEventContent, history_visibility::HistoryVisibility}, room::{encrypted::EncryptedEventContent, history_visibility::HistoryVisibility},
AnyMessageEventContent, EventType, AnyMessageEventContent, EventType,
}, },
executor::spawn,
identifiers::{DeviceId, DeviceIdBox, RoomId, UserId}, identifiers::{DeviceId, DeviceIdBox, RoomId, UserId},
uuid::Uuid, uuid::Uuid,
}; };
@ -223,7 +224,7 @@ impl GroupSessionManager {
let tasks: Vec<_> = devices let tasks: Vec<_> = devices
.iter() .iter()
.map(|d| tokio::spawn(encrypt(d.clone(), content.clone()))) .map(|d| spawn(encrypt(d.clone(), content.clone())))
.collect(); .collect();
let results = join_all(tasks).await; let results = join_all(tasks).await;
@ -478,7 +479,7 @@ impl GroupSessionManager {
let tasks: Vec<_> = devices let tasks: Vec<_> = devices
.chunks(Self::MAX_TO_DEVICE_MESSAGES) .chunks(Self::MAX_TO_DEVICE_MESSAGES)
.map(|chunk| { .map(|chunk| {
tokio::spawn(Self::encrypt_request( spawn(Self::encrypt_request(
chunk.to_vec(), chunk.to_vec(),
key_content.clone(), key_content.clone(),
outbound.clone(), outbound.clone(),
@ -490,6 +491,7 @@ impl GroupSessionManager {
for result in join_all(tasks).await { for result in join_all(tasks).await {
let used_sessions: OlmResult<Vec<Session>> = result.expect("Encryption task paniced"); let used_sessions: OlmResult<Vec<Session>> = result.expect("Encryption task paniced");
changes.sessions.extend(used_sessions?); changes.sessions.extend(used_sessions?);
} }