From 137fa9619f1c2843bc8d3b9111be31d15324c7a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 6 Oct 2020 11:37:29 +0200 Subject: [PATCH] matrix-sdk: Add the ability to stop the sync loop and rename the sync methods. This renames our sync methods so it's clearer which one the main one is. Syncing should be done with the sync method, if one wishes to sync only once the sync_method is provided. If one wishes to have a callback called with every sync the sync_with_callback method exists, the callback now returns a boolean that signals if the loop should be aborted. This does not mean that the current sync request will abort, a cancelable future is still needed for this. --- matrix_sdk/examples/autojoin.rs | 4 +- matrix_sdk/examples/command_bot.rs | 4 +- matrix_sdk/examples/emoji_verification.rs | 4 +- matrix_sdk/examples/image_bot.rs | 4 +- matrix_sdk/examples/login.rs | 2 +- matrix_sdk/src/client.rs | 76 +++++++++++++++++------ 6 files changed, 65 insertions(+), 29 deletions(-) diff --git a/matrix_sdk/examples/autojoin.rs b/matrix_sdk/examples/autojoin.rs index ece720e7..2af9a61b 100644 --- a/matrix_sdk/examples/autojoin.rs +++ b/matrix_sdk/examples/autojoin.rs @@ -64,9 +64,7 @@ async fn login_and_sync( .add_event_emitter(Box::new(AutoJoinBot::new(client.clone()))) .await; - client - .sync_forever(SyncSettings::default(), |_| async {}) - .await; + client.sync(SyncSettings::default()).await; Ok(()) } diff --git a/matrix_sdk/examples/command_bot.rs b/matrix_sdk/examples/command_bot.rs index 9668b366..c2508440 100644 --- a/matrix_sdk/examples/command_bot.rs +++ b/matrix_sdk/examples/command_bot.rs @@ -91,7 +91,7 @@ async fn login_and_sync( // An initial sync to set up state and so our bot doesn't respond to old messages. // If the `StateStore` finds saved state in the location given the initial sync will // be skipped in favor of loading state from the store - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); // add our CommandBot to be notified of incoming messages, we do this after the initial // sync to avoid responding to messages before the bot was running. client @@ -102,7 +102,7 @@ async fn login_and_sync( // `sync_forever` let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); // this keeps state from the server streaming in to CommandBot via the EventEmitter trait - client.sync_forever(settings, |_| async {}).await; + client.sync(settings).await; Ok(()) } diff --git a/matrix_sdk/examples/emoji_verification.rs b/matrix_sdk/examples/emoji_verification.rs index 4b05d9c3..1f831653 100644 --- a/matrix_sdk/examples/emoji_verification.rs +++ b/matrix_sdk/examples/emoji_verification.rs @@ -69,7 +69,7 @@ async fn login( let client_ref = &client; client - .sync_forever(SyncSettings::new(), |response| async move { + .sync_with_callback(SyncSettings::new(), |response| async move { let client = &client_ref; for event in &response.to_device.events { @@ -116,6 +116,8 @@ async fn login( _ => (), } } + + false }) .await; diff --git a/matrix_sdk/examples/image_bot.rs b/matrix_sdk/examples/image_bot.rs index 9479545c..68099d89 100644 --- a/matrix_sdk/examples/image_bot.rs +++ b/matrix_sdk/examples/image_bot.rs @@ -81,13 +81,13 @@ async fn login_and_sync( .login(&username, &password, None, Some("command bot")) .await?; - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); client .add_event_emitter(Box::new(ImageBot::new(client.clone(), image))) .await; let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); - client.sync_forever(settings, |_| async {}).await; + client.sync(settings).await; Ok(()) } diff --git a/matrix_sdk/examples/login.rs b/matrix_sdk/examples/login.rs index 875a3ee9..b63155b5 100644 --- a/matrix_sdk/examples/login.rs +++ b/matrix_sdk/examples/login.rs @@ -52,7 +52,7 @@ async fn login( client .login(username, password, None, Some("rust-sdk")) .await?; - client.sync_forever(SyncSettings::new(), |_| async {}).await; + client.sync(SyncSettings::new()).await; Ok(()) } diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 456eb579..33bf1a11 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -1341,14 +1341,20 @@ impl Client { /// Synchronize the client's state with the latest state on the server. /// - /// If a `StateStore` is provided and this is the initial sync state will - /// be loaded from the state store. + /// **Note**: You should not use this method to repeatedly sync if encryption + /// support is enabled, the [`sync`] method will make additional + /// requests between syncs that are needed for E2E encryption to work. /// /// # Arguments /// /// * `sync_settings` - Settings for the sync call. + /// + /// [`sync`]: #method.sync #[instrument] - pub async fn sync(&self, sync_settings: SyncSettings<'_>) -> Result { + pub async fn sync_once( + &self, + sync_settings: SyncSettings<'_>, + ) -> Result { let request = assign!(sync_events::Request::new(), { filter: sync_settings.filter, since: sync_settings.token.as_deref(), @@ -1366,6 +1372,23 @@ impl Client { Ok(response) } + /// Repeatedly call sync to synchronize the client state with the server. + /// + /// This method will never return, if cancellation is needed the method + /// should be wrapped in a cancelable task or the [`sync_with_callback`] + /// method can be used. + /// + /// # Arguments + /// + /// * `sync_settings` - Settings for the sync call. Note that those settings + /// will be only used for the first sync call. + /// + /// [`sync_with_callback`]: #method.sync_with_callback + pub async fn sync(&self, sync_settings: SyncSettings<'_>) { + self.sync_with_callback(sync_settings, |_| async { false }) + .await + } + /// Repeatedly call sync to synchronize the client state with the server. /// /// # Arguments @@ -1374,7 +1397,10 @@ impl Client { /// will be only used for the first sync call. /// /// * `callback` - A callback that will be called every time a successful - /// response has been fetched from the server. + /// response has been fetched from the server. The callback must return + /// a boolean which signalizes if the method should stop syncing. If the + /// callback returns `false` the sync will continue, if the callback + /// returns `true` the sync will be stopped. /// /// # Examples /// @@ -1417,17 +1443,19 @@ impl Client { /// } /// } /// } + /// + /// false /// }) /// .await; /// }) /// ``` #[instrument(skip(callback))] - pub async fn sync_forever( + pub async fn sync_with_callback( &self, sync_settings: SyncSettings<'_>, callback: impl Fn(sync_events::Response) -> C, ) where - C: Future, + C: Future, { let mut sync_settings = sync_settings; let filter = sync_settings.filter; @@ -1438,7 +1466,7 @@ impl Client { } loop { - let response = self.sync(sync_settings.clone()).await; + let response = self.sync_once(sync_settings.clone()).await; let response = match response { Ok(r) => r, @@ -1478,7 +1506,9 @@ impl Client { } } - callback(response).await; + if callback(response).await { + return; + } let now = Instant::now(); @@ -1943,7 +1973,7 @@ mod test { let room = client.get_joined_room(&room_id).await; assert!(room.is_none()); - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); let room = client.get_left_room(&room_id).await; assert!(room.is_none()); @@ -1958,7 +1988,10 @@ mod test { joined_client.restore_login(session).await.unwrap(); // joined room reloaded from state store - joined_client.sync(SyncSettings::default()).await.unwrap(); + joined_client + .sync_once(SyncSettings::default()) + .await + .unwrap(); let room = joined_client.get_joined_room(&room_id).await; assert!(room.is_some()); @@ -1970,7 +2003,10 @@ mod test { .with_body(test_json::LEAVE_SYNC_EVENT.to_string()) .create(); - joined_client.sync(SyncSettings::default()).await.unwrap(); + joined_client + .sync_once(SyncSettings::default()) + .await + .unwrap(); let room = joined_client.get_joined_room(&room_id).await; assert!(room.is_none()); @@ -1993,7 +2029,7 @@ mod test { .create(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); // let bc = &client.base_client; // let ignored_users = bc.ignored_users.read().await; @@ -2462,7 +2498,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let rooms_lock = &client.base_client.joined_rooms(); let rooms = rooms_lock.read().await; @@ -2490,7 +2526,7 @@ mod test { .create(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let mut room_names = vec![]; for room in client.joined_rooms().read().await.values() { @@ -2513,7 +2549,7 @@ mod test { .with_body(test_json::INVITE_SYNC.to_string()) .create(); - let _response = client.sync(SyncSettings::default()).await.unwrap(); + let _response = client.sync_once(SyncSettings::default()).await.unwrap(); assert!(client.joined_rooms().read().await.is_empty()); assert!(client.left_rooms().read().await.is_empty()); @@ -2538,7 +2574,7 @@ mod test { .with_body(test_json::LEAVE_SYNC.to_string()) .create(); - let _response = client.sync(SyncSettings::default()).await.unwrap(); + let _response = client.sync_once(SyncSettings::default()).await.unwrap(); assert!(client.joined_rooms().read().await.is_empty()); assert!(!client.left_rooms().read().await.is_empty()); @@ -2582,14 +2618,14 @@ mod test { let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); // gather state to save to the db, the first time through loading will be skipped - let _ = client.sync(sync_settings.clone()).await.unwrap(); + let _ = client.sync_once(sync_settings.clone()).await.unwrap(); // now syncing the client will update from the state store let config = ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); let client = Client::new_with_config(homeserver, config).unwrap(); client.restore_login(session.clone()).await.unwrap(); - client.sync(sync_settings).await.unwrap(); + client.sync_once(sync_settings).await.unwrap(); let base_client = &client.base_client; @@ -2622,7 +2658,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let response = client.sync(sync_settings).await.unwrap(); + let response = client.sync_once(sync_settings).await.unwrap(); assert_ne!(response.next_batch, ""); @@ -2644,7 +2680,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let mut names = vec![]; for r in client.joined_rooms().read().await.values() {