diff --git a/matrix_sdk/examples/autojoin.rs b/matrix_sdk/examples/autojoin.rs index ece720e7..2af9a61b 100644 --- a/matrix_sdk/examples/autojoin.rs +++ b/matrix_sdk/examples/autojoin.rs @@ -64,9 +64,7 @@ async fn login_and_sync( .add_event_emitter(Box::new(AutoJoinBot::new(client.clone()))) .await; - client - .sync_forever(SyncSettings::default(), |_| async {}) - .await; + client.sync(SyncSettings::default()).await; Ok(()) } diff --git a/matrix_sdk/examples/command_bot.rs b/matrix_sdk/examples/command_bot.rs index 9668b366..c2508440 100644 --- a/matrix_sdk/examples/command_bot.rs +++ b/matrix_sdk/examples/command_bot.rs @@ -91,7 +91,7 @@ async fn login_and_sync( // An initial sync to set up state and so our bot doesn't respond to old messages. // If the `StateStore` finds saved state in the location given the initial sync will // be skipped in favor of loading state from the store - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); // add our CommandBot to be notified of incoming messages, we do this after the initial // sync to avoid responding to messages before the bot was running. client @@ -102,7 +102,7 @@ async fn login_and_sync( // `sync_forever` let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); // this keeps state from the server streaming in to CommandBot via the EventEmitter trait - client.sync_forever(settings, |_| async {}).await; + client.sync(settings).await; Ok(()) } diff --git a/matrix_sdk/examples/emoji_verification.rs b/matrix_sdk/examples/emoji_verification.rs index 4b05d9c3..1f831653 100644 --- a/matrix_sdk/examples/emoji_verification.rs +++ b/matrix_sdk/examples/emoji_verification.rs @@ -69,7 +69,7 @@ async fn login( let client_ref = &client; client - .sync_forever(SyncSettings::new(), |response| async move { + .sync_with_callback(SyncSettings::new(), |response| async move { let client = &client_ref; for event in &response.to_device.events { @@ -116,6 +116,8 @@ async fn login( _ => (), } } + + false }) .await; diff --git a/matrix_sdk/examples/image_bot.rs b/matrix_sdk/examples/image_bot.rs index 9479545c..68099d89 100644 --- a/matrix_sdk/examples/image_bot.rs +++ b/matrix_sdk/examples/image_bot.rs @@ -81,13 +81,13 @@ async fn login_and_sync( .login(&username, &password, None, Some("command bot")) .await?; - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); client .add_event_emitter(Box::new(ImageBot::new(client.clone(), image))) .await; let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); - client.sync_forever(settings, |_| async {}).await; + client.sync(settings).await; Ok(()) } diff --git a/matrix_sdk/examples/login.rs b/matrix_sdk/examples/login.rs index 875a3ee9..b63155b5 100644 --- a/matrix_sdk/examples/login.rs +++ b/matrix_sdk/examples/login.rs @@ -52,7 +52,7 @@ async fn login( client .login(username, password, None, Some("rust-sdk")) .await?; - client.sync_forever(SyncSettings::new(), |_| async {}).await; + client.sync(SyncSettings::new()).await; Ok(()) } diff --git a/matrix_sdk/src/client.rs b/matrix_sdk/src/client.rs index 456eb579..33bf1a11 100644 --- a/matrix_sdk/src/client.rs +++ b/matrix_sdk/src/client.rs @@ -1341,14 +1341,20 @@ impl Client { /// Synchronize the client's state with the latest state on the server. /// - /// If a `StateStore` is provided and this is the initial sync state will - /// be loaded from the state store. + /// **Note**: You should not use this method to repeatedly sync if encryption + /// support is enabled, the [`sync`] method will make additional + /// requests between syncs that are needed for E2E encryption to work. /// /// # Arguments /// /// * `sync_settings` - Settings for the sync call. + /// + /// [`sync`]: #method.sync #[instrument] - pub async fn sync(&self, sync_settings: SyncSettings<'_>) -> Result { + pub async fn sync_once( + &self, + sync_settings: SyncSettings<'_>, + ) -> Result { let request = assign!(sync_events::Request::new(), { filter: sync_settings.filter, since: sync_settings.token.as_deref(), @@ -1366,6 +1372,23 @@ impl Client { Ok(response) } + /// Repeatedly call sync to synchronize the client state with the server. + /// + /// This method will never return, if cancellation is needed the method + /// should be wrapped in a cancelable task or the [`sync_with_callback`] + /// method can be used. + /// + /// # Arguments + /// + /// * `sync_settings` - Settings for the sync call. Note that those settings + /// will be only used for the first sync call. + /// + /// [`sync_with_callback`]: #method.sync_with_callback + pub async fn sync(&self, sync_settings: SyncSettings<'_>) { + self.sync_with_callback(sync_settings, |_| async { false }) + .await + } + /// Repeatedly call sync to synchronize the client state with the server. /// /// # Arguments @@ -1374,7 +1397,10 @@ impl Client { /// will be only used for the first sync call. /// /// * `callback` - A callback that will be called every time a successful - /// response has been fetched from the server. + /// response has been fetched from the server. The callback must return + /// a boolean which signalizes if the method should stop syncing. If the + /// callback returns `false` the sync will continue, if the callback + /// returns `true` the sync will be stopped. /// /// # Examples /// @@ -1417,17 +1443,19 @@ impl Client { /// } /// } /// } + /// + /// false /// }) /// .await; /// }) /// ``` #[instrument(skip(callback))] - pub async fn sync_forever( + pub async fn sync_with_callback( &self, sync_settings: SyncSettings<'_>, callback: impl Fn(sync_events::Response) -> C, ) where - C: Future, + C: Future, { let mut sync_settings = sync_settings; let filter = sync_settings.filter; @@ -1438,7 +1466,7 @@ impl Client { } loop { - let response = self.sync(sync_settings.clone()).await; + let response = self.sync_once(sync_settings.clone()).await; let response = match response { Ok(r) => r, @@ -1478,7 +1506,9 @@ impl Client { } } - callback(response).await; + if callback(response).await { + return; + } let now = Instant::now(); @@ -1943,7 +1973,7 @@ mod test { let room = client.get_joined_room(&room_id).await; assert!(room.is_none()); - client.sync(SyncSettings::default()).await.unwrap(); + client.sync_once(SyncSettings::default()).await.unwrap(); let room = client.get_left_room(&room_id).await; assert!(room.is_none()); @@ -1958,7 +1988,10 @@ mod test { joined_client.restore_login(session).await.unwrap(); // joined room reloaded from state store - joined_client.sync(SyncSettings::default()).await.unwrap(); + joined_client + .sync_once(SyncSettings::default()) + .await + .unwrap(); let room = joined_client.get_joined_room(&room_id).await; assert!(room.is_some()); @@ -1970,7 +2003,10 @@ mod test { .with_body(test_json::LEAVE_SYNC_EVENT.to_string()) .create(); - joined_client.sync(SyncSettings::default()).await.unwrap(); + joined_client + .sync_once(SyncSettings::default()) + .await + .unwrap(); let room = joined_client.get_joined_room(&room_id).await; assert!(room.is_none()); @@ -1993,7 +2029,7 @@ mod test { .create(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); // let bc = &client.base_client; // let ignored_users = bc.ignored_users.read().await; @@ -2462,7 +2498,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let rooms_lock = &client.base_client.joined_rooms(); let rooms = rooms_lock.read().await; @@ -2490,7 +2526,7 @@ mod test { .create(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let mut room_names = vec![]; for room in client.joined_rooms().read().await.values() { @@ -2513,7 +2549,7 @@ mod test { .with_body(test_json::INVITE_SYNC.to_string()) .create(); - let _response = client.sync(SyncSettings::default()).await.unwrap(); + let _response = client.sync_once(SyncSettings::default()).await.unwrap(); assert!(client.joined_rooms().read().await.is_empty()); assert!(client.left_rooms().read().await.is_empty()); @@ -2538,7 +2574,7 @@ mod test { .with_body(test_json::LEAVE_SYNC.to_string()) .create(); - let _response = client.sync(SyncSettings::default()).await.unwrap(); + let _response = client.sync_once(SyncSettings::default()).await.unwrap(); assert!(client.joined_rooms().read().await.is_empty()); assert!(!client.left_rooms().read().await.is_empty()); @@ -2582,14 +2618,14 @@ mod test { let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); // gather state to save to the db, the first time through loading will be skipped - let _ = client.sync(sync_settings.clone()).await.unwrap(); + let _ = client.sync_once(sync_settings.clone()).await.unwrap(); // now syncing the client will update from the state store let config = ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); let client = Client::new_with_config(homeserver, config).unwrap(); client.restore_login(session.clone()).await.unwrap(); - client.sync(sync_settings).await.unwrap(); + client.sync_once(sync_settings).await.unwrap(); let base_client = &client.base_client; @@ -2622,7 +2658,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let response = client.sync(sync_settings).await.unwrap(); + let response = client.sync_once(sync_settings).await.unwrap(); assert_ne!(response.next_batch, ""); @@ -2644,7 +2680,7 @@ mod test { let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - let _response = client.sync(sync_settings).await.unwrap(); + let _response = client.sync_once(sync_settings).await.unwrap(); let mut names = vec![]; for r in client.joined_rooms().read().await.values() {