diff --git a/matrix_sdk/Cargo.toml b/matrix_sdk/Cargo.toml index 805283fe..46372aa4 100644 --- a/matrix_sdk/Cargo.toml +++ b/matrix_sdk/Cargo.toml @@ -68,6 +68,7 @@ features = ["wasm-bindgen"] [dev-dependencies] async-trait = "0.1.40" +async-std = { version = "1.6.5", features = ["unstable"] } dirs = "3.0.1" matrix-sdk-test = { version = "0.1.0", path = "../matrix_sdk_test" } tokio = { version = "0.2.22", features = ["rt-threaded", "macros"] } diff --git a/matrix_sdk/examples/autojoin.rs b/matrix_sdk/examples/autojoin.rs index ece720e7..2af9a61b 100644 --- a/matrix_sdk/examples/autojoin.rs +++ b/matrix_sdk/examples/autojoin.rs @@ -64,9 +64,7 @@ async fn login_and_sync( .add_event_emitter(Box::new(AutoJoinBot::new(client.clone()))) .await; - client - .sync_forever(SyncSettings::default(), |_| async {}) - .await; + client.sync(SyncSettings::default()).await; Ok(()) } diff --git a/matrix_sdk/examples/command_bot.rs b/matrix_sdk/examples/command_bot.rs index 9668b366..4f2d9fea 100644 --- a/matrix_sdk/examples/command_bot.rs +++ b/matrix_sdk/examples/command_bot.rs @@ -13,7 +13,7 @@ use url::Url; struct CommandBot { /// This clone of the `Client` will send requests to the server, - /// while the other keeps us in sync with the server using `sync_forever`. + /// while the other keeps us in sync with the server using `sync`. client: Client, } @@ -91,18 +91,18 @@ async fn login_and_sync( // An initial sync to set up state and so our bot doesn't respond to old messages. // If the `StateStore` finds saved state in the location given the initial sync will // be skipped in favor of loading state from the store - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); // add our CommandBot to be notified of incoming messages, we do this after the initial // sync to avoid responding to messages before the bot was running. client .add_event_emitter(Box::new(CommandBot::new(client.clone()))) .await; - // since we called sync before we `sync_forever` we must pass that sync token to - // `sync_forever` + // since we called `sync_once` before we entered our sync loop we must pass + // that sync token to `sync` let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); // this keeps state from the server streaming in to CommandBot via the EventEmitter trait - client.sync_forever(settings, |_| async {}).await; + client.sync(settings).await; Ok(()) } diff --git a/matrix_sdk/examples/emoji_verification.rs b/matrix_sdk/examples/emoji_verification.rs index 4b05d9c3..509e7ea4 100644 --- a/matrix_sdk/examples/emoji_verification.rs +++ b/matrix_sdk/examples/emoji_verification.rs @@ -2,7 +2,8 @@ use std::{env, io, process::exit}; use url::Url; use matrix_sdk::{ - self, events::AnyToDeviceEvent, identifiers::UserId, Client, ClientConfig, Sas, SyncSettings, + self, events::AnyToDeviceEvent, identifiers::UserId, Client, ClientConfig, LoopCtrl, Sas, + SyncSettings, }; async fn wait_for_confirmation(client: Client, sas: Sas) { @@ -69,7 +70,7 @@ async fn login( let client_ref = &client; client - .sync_forever(SyncSettings::new(), |response| async move { + .sync_with_callback(SyncSettings::new(), |response| async move { let client = &client_ref; for event in &response.to_device.events { @@ -116,6 +117,8 @@ async fn login( _ => (), } } + + LoopCtrl::Continue }) .await; diff --git a/matrix_sdk/examples/image_bot.rs b/matrix_sdk/examples/image_bot.rs index 9479545c..68099d89 100644 --- a/matrix_sdk/examples/image_bot.rs +++ b/matrix_sdk/examples/image_bot.rs @@ -81,13 +81,13 @@ async fn login_and_sync( .login(&username, &password, None, Some("command bot")) .await?; - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); client .add_event_emitter(Box::new(ImageBot::new(client.clone(), image))) .await; let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); - client.sync_forever(settings, |_| async {}).await; + client.sync(settings).await; Ok(()) } diff --git a/matrix_sdk/examples/login.rs b/matrix_sdk/examples/login.rs index 875a3ee9..b63155b5 100644 --- a/matrix_sdk/examples/login.rs +++ b/matrix_sdk/examples/login.rs @@ -52,7 +52,7 @@ async fn login( client .login(username, password, None, Some("rust-sdk")) .await?; - client.sync_forever(SyncSettings::new(), |_| async {}).await; + client.sync(SyncSettings::new()).await; Ok(()) } diff --git a/matrix_sdk/examples/wasm_command_bot/src/lib.rs b/matrix_sdk/examples/wasm_command_bot/src/lib.rs index a62f3b95..500736cf 100644 --- a/matrix_sdk/examples/wasm_command_bot/src/lib.rs +++ b/matrix_sdk/examples/wasm_command_bot/src/lib.rs @@ -7,7 +7,7 @@ use matrix_sdk::{ AnyMessageEventContent, AnySyncMessageEvent, AnySyncRoomEvent, SyncMessageEvent, }, identifiers::RoomId, - Client, ClientConfig, SyncSettings, + Client, ClientConfig, LoopCtrl, SyncSettings, }; use url::Url; use wasm_bindgen::prelude::*; @@ -41,7 +41,7 @@ impl WasmBot { self.0.room_send(&room_id, content, None).await.unwrap(); } } - async fn on_sync_response(&self, response: SyncResponse) { + async fn on_sync_response(&self, response: SyncResponse) -> LoopCtrl { console::log_1(&"Synced".to_string().into()); for (room_id, room) in response.rooms.join { @@ -53,6 +53,8 @@ impl WasmBot { } } } + + LoopCtrl::Continue } } @@ -73,11 +75,11 @@ pub async fn run() -> Result { let bot = WasmBot(client.clone()); - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); client - .sync_forever(settings, |response| bot.on_sync_response(response)) + .sync_with_callback(settings, |response| bot.on_sync_response(response)) .await; Ok(JsValue::NULL) diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 6feaa40c..9d224765 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -47,6 +47,20 @@ use matrix_sdk_base::crypto::{ AttachmentEncryptor, OutgoingRequests, ToDeviceRequest, }; +/// Enum controlling if a loop running callbacks should continue or abort. +/// +/// This is mainly used in the [`sync_with_callback`] method, the return value +/// of the provided callback controls if the sync loop should be exited. +/// +/// [`sync_with_callback`]: #method.sync_with_callback +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LoopCtrl { + /// Continue running the loop. + Continue, + /// Break out of the loop. + Break, +} + use matrix_sdk_common::{ api::r0::{ account::register, @@ -477,7 +491,7 @@ impl Client { /// If this isn't the first login a device id should be provided to restore /// the correct stores. /// - /// Alternatively the `restore_login()` method can be used to restore a + /// Alternatively the [`restore_login`] method can be used to restore a /// logged in client without the password. /// /// # Arguments @@ -490,6 +504,29 @@ impl Client { /// not given the homeserver will create one. Can be an existing /// device_id from a previous login call. Note that this should be done /// only if the client also holds the encryption keys for this device. + /// + /// # Example + /// ```no_run + /// # use std::convert::TryFrom; + /// # use matrix_sdk::Client; + /// # use matrix_sdk::identifiers::DeviceId; + /// # use matrix_sdk_common::assign; + /// # use futures::executor::block_on; + /// # use url::Url; + /// # let homeserver = Url::parse("http://example.com").unwrap(); + /// # block_on(async { + /// let client = Client::new(homeserver).unwrap(); + /// let user = "example"; + /// let response = client + /// .login(user, "wordpass", None, Some("My bot")).await + /// .unwrap(); + /// + /// println!("Logged in as {}, got device_id {} and access_token {}", + /// user, response.device_id, response.access_token); + /// # }) + /// ``` + /// + /// [`restore_login`]: #method.restore_login #[instrument(skip(password))] pub async fn login( &self, @@ -521,13 +558,15 @@ impl Client { /// This can be used to restore the client to a logged in state, loading all /// the stored state and encryption keys. /// - /// Alternatively, if the whole session isn't stored the `login()` method + /// Alternatively, if the whole session isn't stored the [`login`] method /// can be used with a device id. /// /// # Arguments /// /// * `session` - A session that the user already has from a /// previous login call. + /// + /// [`login`]: #method.login pub async fn restore_login(&self, session: Session) -> Result<()> { Ok(self.base_client.restore_login(session).await?) } @@ -1342,14 +1381,20 @@ impl Client { /// Synchronize the client's state with the latest state on the server. /// - /// If a `StateStore` is provided and this is the initial sync state will - /// be loaded from the state store. + /// **Note**: You should not use this method to repeatedly sync if encryption + /// support is enabled, the [`sync`] method will make additional + /// requests between syncs that are needed for E2E encryption to work. /// /// # Arguments /// /// * `sync_settings` - Settings for the sync call. + /// + /// [`sync`]: #method.sync #[instrument] - pub async fn sync(&self, sync_settings: SyncSettings<'_>) -> Result { + pub async fn sync_once( + &self, + sync_settings: SyncSettings<'_>, + ) -> Result { let request = assign!(sync_events::Request::new(), { filter: sync_settings.filter, since: sync_settings.token.as_deref(), @@ -1367,6 +1412,23 @@ impl Client { Ok(response) } + /// Repeatedly call sync to synchronize the client state with the server. + /// + /// This method will never return, if cancellation is needed the method + /// should be wrapped in a cancelable task or the [`sync_with_callback`] + /// method can be used. + /// + /// # Arguments + /// + /// * `sync_settings` - Settings for the sync call. Note that those settings + /// will be only used for the first sync call. + /// + /// [`sync_with_callback`]: #method.sync_with_callback + pub async fn sync(&self, sync_settings: SyncSettings<'_>) { + self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }) + .await + } + /// Repeatedly call sync to synchronize the client state with the server. /// /// # Arguments @@ -1375,7 +1437,10 @@ impl Client { /// will be only used for the first sync call. /// /// * `callback` - A callback that will be called every time a successful - /// response has been fetched from the server. + /// response has been fetched from the server. The callback must return + /// a boolean which signalizes if the method should stop syncing. If the + /// callback returns `false` the sync will continue, if the callback + /// returns `true` the sync will be stopped. /// /// # Examples /// @@ -1385,18 +1450,17 @@ impl Client { /// /// ```compile_fail,E0658 /// # use matrix_sdk::events::{ - /// # collections::all::RoomEvent, /// # room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, - /// # EventResult, /// # }; /// # use matrix_sdk::Room; /// # use std::sync::{Arc, RwLock}; - /// # use matrix_sdk::{Client, SyncSettings}; + /// # use std::time::Duration; + /// # use matrix_sdk::{Client, SyncSettings, LoopCtrl}; /// # use url::Url; /// # use futures::executor::block_on; /// # block_on(async { /// # let homeserver = Url::parse("http://localhost:8080").unwrap(); - /// # let mut client = Client::new(homeserver, None).unwrap(); + /// # let mut client = Client::new(homeserver).unwrap(); /// /// use async_std::sync::channel; /// @@ -1404,31 +1468,32 @@ impl Client { /// /// let sync_channel = &tx; /// let sync_settings = SyncSettings::new() - /// .timeout(30_000) - /// .unwrap(); + /// .timeout(Duration::from_secs(30)); /// /// client - /// .sync_forever(sync_settings, async move |response| { + /// .sync_with_callback(sync_settings, async move |response| { /// let channel = sync_channel; /// /// for (room_id, room) in response.rooms.join { /// for event in room.timeline.events { - /// if let EventResult::Ok(e) = event { + /// if let Ok(e) = event.deserialize() { /// channel.send(e).await; /// } /// } /// } + /// + /// LoopCtrl::Continue /// }) /// .await; /// }) /// ``` #[instrument(skip(callback))] - pub async fn sync_forever( + pub async fn sync_with_callback( &self, sync_settings: SyncSettings<'_>, callback: impl Fn(sync_events::Response) -> C, ) where - C: Future, + C: Future, { let mut sync_settings = sync_settings; let filter = sync_settings.filter; @@ -1439,7 +1504,7 @@ impl Client { } loop { - let response = self.sync(sync_settings.clone()).await; + let response = self.sync_once(sync_settings.clone()).await; let response = match response { Ok(r) => r, @@ -1479,7 +1544,9 @@ impl Client { } } - callback(response).await; + if callback(response).await == LoopCtrl::Break { + return; + } let now = Instant::now(); @@ -1766,7 +1833,10 @@ impl Client { /// ``` #[cfg(feature = "encryption")] #[cfg(not(target_arch = "wasm32"))] - #[cfg_attr(feature = "docs", doc(cfg(encryption)))] + #[cfg_attr( + feature = "docs", + doc(cfg(all(encryption, not(target_arch = "wasm32")))) + )] pub async fn export_keys( &self, path: PathBuf, @@ -1816,7 +1886,10 @@ impl Client { /// ``` #[cfg(feature = "encryption")] #[cfg(not(target_arch = "wasm32"))] - #[cfg_attr(feature = "docs", doc(cfg(encryption)))] + #[cfg_attr( + feature = "docs", + doc(cfg(all(encryption, not(target_arch = "wasm32")))) + )] pub async fn import_keys(&self, path: PathBuf, passphrase: &str) -> Result<()> { let olm = self .base_client @@ -1944,7 +2017,7 @@ mod test { let room = client.get_joined_room(&room_id).await; assert!(room.is_none()); - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); let room = client.get_left_room(&room_id).await; assert!(room.is_none()); @@ -1959,7 +2032,10 @@ mod test { joined_client.restore_login(session).await.unwrap(); // joined room reloaded from state store - joined_client.sync(SyncSettings::default()).await.unwrap(); + joined_client + .sync_once(SyncSettings::default()) + .await + .unwrap(); let room = joined_client.get_joined_room(&room_id).await; assert!(room.is_some()); @@ -1971,7 +2047,10 @@ mod test { .with_body(test_json::LEAVE_SYNC_EVENT.to_string()) .create(); - joined_client.sync(SyncSettings::default()).await.unwrap(); + joined_client + .sync_once(SyncSettings::default()) + .await + .unwrap(); let room = joined_client.get_joined_room(&room_id).await; assert!(room.is_none()); @@ -1994,7 +2073,7 @@ mod test { .create(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); // let bc = &client.base_client; // let ignored_users = bc.ignored_users.read().await; @@ -2463,7 +2542,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let rooms_lock = &client.base_client.joined_rooms(); let rooms = rooms_lock.read().await; @@ -2491,7 +2570,7 @@ mod test { .create(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let mut room_names = vec![]; for room in client.joined_rooms().read().await.values() { @@ -2514,7 +2593,7 @@ mod test { .with_body(test_json::INVITE_SYNC.to_string()) .create(); - let _response = client.sync(SyncSettings::default()).await.unwrap(); + let _response = client.sync_once(SyncSettings::default()).await.unwrap(); assert!(client.joined_rooms().read().await.is_empty()); assert!(client.left_rooms().read().await.is_empty()); @@ -2539,7 +2618,7 @@ mod test { .with_body(test_json::LEAVE_SYNC.to_string()) .create(); - let _response = client.sync(SyncSettings::default()).await.unwrap(); + let _response = client.sync_once(SyncSettings::default()).await.unwrap(); assert!(client.joined_rooms().read().await.is_empty()); assert!(!client.left_rooms().read().await.is_empty()); @@ -2583,14 +2662,14 @@ mod test { let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); // gather state to save to the db, the first time through loading will be skipped - let _ = client.sync(sync_settings.clone()).await.unwrap(); + let _ = client.sync_once(sync_settings.clone()).await.unwrap(); // now syncing the client will update from the state store let config = ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); let client = Client::new_with_config(homeserver, config).unwrap(); client.restore_login(session.clone()).await.unwrap(); - client.sync(sync_settings).await.unwrap(); + client.sync_once(sync_settings).await.unwrap(); let base_client = &client.base_client; @@ -2623,7 +2702,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let response = client.sync(sync_settings).await.unwrap(); + let response = client.sync_once(sync_settings).await.unwrap(); assert_ne!(response.next_batch, ""); @@ -2645,7 +2724,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let mut names = vec![]; for r in client.joined_rooms().read().await.values() { diff --git a/matrix_sdk/src/lib.rs b/matrix_sdk/src/lib.rs index 3dd243b5..078de2e1 100644 --- a/matrix_sdk/src/lib.rs +++ b/matrix_sdk/src/lib.rs @@ -71,7 +71,7 @@ mod device; #[cfg(feature = "encryption")] mod sas; -pub use client::{Client, ClientConfig, SyncSettings}; +pub use client::{Client, ClientConfig, LoopCtrl, SyncSettings}; #[cfg(feature = "encryption")] #[cfg_attr(feature = "docs", doc(cfg(encryption)))] pub use device::Device;