matrix-sdk: Add the ability to stop the sync loop and rename the sync methods.

This renames our sync methods so it's clearer which one the main one is.
Syncing should be done with the sync method, if one wishes to sync only
once the sync_method is provided.

If one wishes to have a callback called with every sync the
sync_with_callback method exists, the callback now returns a boolean
that signals if the loop should be aborted. This does not mean that the
current sync request will abort, a cancelable future is still needed for
this.
master
Damir Jelić 2020-10-06 11:37:29 +02:00
parent 9a5345ec77
commit 137fa9619f
6 changed files with 65 additions and 29 deletions

View File

@ -64,9 +64,7 @@ async fn login_and_sync(
.add_event_emitter(Box::new(AutoJoinBot::new(client.clone()))) .add_event_emitter(Box::new(AutoJoinBot::new(client.clone())))
.await; .await;
client client.sync(SyncSettings::default()).await;
.sync_forever(SyncSettings::default(), |_| async {})
.await;
Ok(()) Ok(())
} }

View File

@ -91,7 +91,7 @@ async fn login_and_sync(
// An initial sync to set up state and so our bot doesn't respond to old messages. // An initial sync to set up state and so our bot doesn't respond to old messages.
// If the `StateStore` finds saved state in the location given the initial sync will // If the `StateStore` finds saved state in the location given the initial sync will
// be skipped in favor of loading state from the store // be skipped in favor of loading state from the store
client.sync(SyncSettings::default()).await.unwrap(); client.sync_once(SyncSettings::default()).await.unwrap();
// add our CommandBot to be notified of incoming messages, we do this after the initial // add our CommandBot to be notified of incoming messages, we do this after the initial
// sync to avoid responding to messages before the bot was running. // sync to avoid responding to messages before the bot was running.
client client
@ -102,7 +102,7 @@ async fn login_and_sync(
// `sync_forever` // `sync_forever`
let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); let settings = SyncSettings::default().token(client.sync_token().await.unwrap());
// this keeps state from the server streaming in to CommandBot via the EventEmitter trait // this keeps state from the server streaming in to CommandBot via the EventEmitter trait
client.sync_forever(settings, |_| async {}).await; client.sync(settings).await;
Ok(()) Ok(())
} }

View File

@ -69,7 +69,7 @@ async fn login(
let client_ref = &client; let client_ref = &client;
client client
.sync_forever(SyncSettings::new(), |response| async move { .sync_with_callback(SyncSettings::new(), |response| async move {
let client = &client_ref; let client = &client_ref;
for event in &response.to_device.events { for event in &response.to_device.events {
@ -116,6 +116,8 @@ async fn login(
_ => (), _ => (),
} }
} }
false
}) })
.await; .await;

View File

@ -81,13 +81,13 @@ async fn login_and_sync(
.login(&username, &password, None, Some("command bot")) .login(&username, &password, None, Some("command bot"))
.await?; .await?;
client.sync(SyncSettings::default()).await.unwrap(); client.sync_once(SyncSettings::default()).await.unwrap();
client client
.add_event_emitter(Box::new(ImageBot::new(client.clone(), image))) .add_event_emitter(Box::new(ImageBot::new(client.clone(), image)))
.await; .await;
let settings = SyncSettings::default().token(client.sync_token().await.unwrap()); let settings = SyncSettings::default().token(client.sync_token().await.unwrap());
client.sync_forever(settings, |_| async {}).await; client.sync(settings).await;
Ok(()) Ok(())
} }

View File

@ -52,7 +52,7 @@ async fn login(
client client
.login(username, password, None, Some("rust-sdk")) .login(username, password, None, Some("rust-sdk"))
.await?; .await?;
client.sync_forever(SyncSettings::new(), |_| async {}).await; client.sync(SyncSettings::new()).await;
Ok(()) Ok(())
} }

View File

@ -1341,14 +1341,20 @@ impl Client {
/// Synchronize the client's state with the latest state on the server. /// Synchronize the client's state with the latest state on the server.
/// ///
/// If a `StateStore` is provided and this is the initial sync state will /// **Note**: You should not use this method to repeatedly sync if encryption
/// be loaded from the state store. /// support is enabled, the [`sync`] method will make additional
/// requests between syncs that are needed for E2E encryption to work.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `sync_settings` - Settings for the sync call. /// * `sync_settings` - Settings for the sync call.
///
/// [`sync`]: #method.sync
#[instrument] #[instrument]
pub async fn sync(&self, sync_settings: SyncSettings<'_>) -> Result<sync_events::Response> { pub async fn sync_once(
&self,
sync_settings: SyncSettings<'_>,
) -> Result<sync_events::Response> {
let request = assign!(sync_events::Request::new(), { let request = assign!(sync_events::Request::new(), {
filter: sync_settings.filter, filter: sync_settings.filter,
since: sync_settings.token.as_deref(), since: sync_settings.token.as_deref(),
@ -1366,6 +1372,23 @@ impl Client {
Ok(response) Ok(response)
} }
/// Repeatedly call sync to synchronize the client state with the server.
///
/// This method will never return, if cancellation is needed the method
/// should be wrapped in a cancelable task or the [`sync_with_callback`]
/// method can be used.
///
/// # Arguments
///
/// * `sync_settings` - Settings for the sync call. Note that those settings
/// will be only used for the first sync call.
///
/// [`sync_with_callback`]: #method.sync_with_callback
pub async fn sync(&self, sync_settings: SyncSettings<'_>) {
self.sync_with_callback(sync_settings, |_| async { false })
.await
}
/// Repeatedly call sync to synchronize the client state with the server. /// Repeatedly call sync to synchronize the client state with the server.
/// ///
/// # Arguments /// # Arguments
@ -1374,7 +1397,10 @@ impl Client {
/// will be only used for the first sync call. /// will be only used for the first sync call.
/// ///
/// * `callback` - A callback that will be called every time a successful /// * `callback` - A callback that will be called every time a successful
/// response has been fetched from the server. /// response has been fetched from the server. The callback must return
/// a boolean which signalizes if the method should stop syncing. If the
/// callback returns `false` the sync will continue, if the callback
/// returns `true` the sync will be stopped.
/// ///
/// # Examples /// # Examples
/// ///
@ -1417,17 +1443,19 @@ impl Client {
/// } /// }
/// } /// }
/// } /// }
///
/// false
/// }) /// })
/// .await; /// .await;
/// }) /// })
/// ``` /// ```
#[instrument(skip(callback))] #[instrument(skip(callback))]
pub async fn sync_forever<C>( pub async fn sync_with_callback<C>(
&self, &self,
sync_settings: SyncSettings<'_>, sync_settings: SyncSettings<'_>,
callback: impl Fn(sync_events::Response) -> C, callback: impl Fn(sync_events::Response) -> C,
) where ) where
C: Future<Output = ()>, C: Future<Output = bool>,
{ {
let mut sync_settings = sync_settings; let mut sync_settings = sync_settings;
let filter = sync_settings.filter; let filter = sync_settings.filter;
@ -1438,7 +1466,7 @@ impl Client {
} }
loop { loop {
let response = self.sync(sync_settings.clone()).await; let response = self.sync_once(sync_settings.clone()).await;
let response = match response { let response = match response {
Ok(r) => r, Ok(r) => r,
@ -1478,7 +1506,9 @@ impl Client {
} }
} }
callback(response).await; if callback(response).await {
return;
}
let now = Instant::now(); let now = Instant::now();
@ -1943,7 +1973,7 @@ mod test {
let room = client.get_joined_room(&room_id).await; let room = client.get_joined_room(&room_id).await;
assert!(room.is_none()); assert!(room.is_none());
client.sync(SyncSettings::default()).await.unwrap(); client.sync_once(SyncSettings::default()).await.unwrap();
let room = client.get_left_room(&room_id).await; let room = client.get_left_room(&room_id).await;
assert!(room.is_none()); assert!(room.is_none());
@ -1958,7 +1988,10 @@ mod test {
joined_client.restore_login(session).await.unwrap(); joined_client.restore_login(session).await.unwrap();
// joined room reloaded from state store // joined room reloaded from state store
joined_client.sync(SyncSettings::default()).await.unwrap(); joined_client
.sync_once(SyncSettings::default())
.await
.unwrap();
let room = joined_client.get_joined_room(&room_id).await; let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_some()); assert!(room.is_some());
@ -1970,7 +2003,10 @@ mod test {
.with_body(test_json::LEAVE_SYNC_EVENT.to_string()) .with_body(test_json::LEAVE_SYNC_EVENT.to_string())
.create(); .create();
joined_client.sync(SyncSettings::default()).await.unwrap(); joined_client
.sync_once(SyncSettings::default())
.await
.unwrap();
let room = joined_client.get_joined_room(&room_id).await; let room = joined_client.get_joined_room(&room_id).await;
assert!(room.is_none()); assert!(room.is_none());
@ -1993,7 +2029,7 @@ mod test {
.create(); .create();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync(sync_settings).await.unwrap(); let _response = client.sync_once(sync_settings).await.unwrap();
// let bc = &client.base_client; // let bc = &client.base_client;
// let ignored_users = bc.ignored_users.read().await; // let ignored_users = bc.ignored_users.read().await;
@ -2462,7 +2498,7 @@ mod test {
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync(sync_settings).await.unwrap(); let _response = client.sync_once(sync_settings).await.unwrap();
let rooms_lock = &client.base_client.joined_rooms(); let rooms_lock = &client.base_client.joined_rooms();
let rooms = rooms_lock.read().await; let rooms = rooms_lock.read().await;
@ -2490,7 +2526,7 @@ mod test {
.create(); .create();
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync(sync_settings).await.unwrap(); let _response = client.sync_once(sync_settings).await.unwrap();
let mut room_names = vec![]; let mut room_names = vec![];
for room in client.joined_rooms().read().await.values() { for room in client.joined_rooms().read().await.values() {
@ -2513,7 +2549,7 @@ mod test {
.with_body(test_json::INVITE_SYNC.to_string()) .with_body(test_json::INVITE_SYNC.to_string())
.create(); .create();
let _response = client.sync(SyncSettings::default()).await.unwrap(); let _response = client.sync_once(SyncSettings::default()).await.unwrap();
assert!(client.joined_rooms().read().await.is_empty()); assert!(client.joined_rooms().read().await.is_empty());
assert!(client.left_rooms().read().await.is_empty()); assert!(client.left_rooms().read().await.is_empty());
@ -2538,7 +2574,7 @@ mod test {
.with_body(test_json::LEAVE_SYNC.to_string()) .with_body(test_json::LEAVE_SYNC.to_string())
.create(); .create();
let _response = client.sync(SyncSettings::default()).await.unwrap(); let _response = client.sync_once(SyncSettings::default()).await.unwrap();
assert!(client.joined_rooms().read().await.is_empty()); assert!(client.joined_rooms().read().await.is_empty());
assert!(!client.left_rooms().read().await.is_empty()); assert!(!client.left_rooms().read().await.is_empty());
@ -2582,14 +2618,14 @@ mod test {
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000)); let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_millis(3000));
// gather state to save to the db, the first time through loading will be skipped // gather state to save to the db, the first time through loading will be skipped
let _ = client.sync(sync_settings.clone()).await.unwrap(); let _ = client.sync_once(sync_settings.clone()).await.unwrap();
// now syncing the client will update from the state store // now syncing the client will update from the state store
let config = let config =
ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap())); ClientConfig::default().state_store(Box::new(JsonStore::open(dir.path()).unwrap()));
let client = Client::new_with_config(homeserver, config).unwrap(); let client = Client::new_with_config(homeserver, config).unwrap();
client.restore_login(session.clone()).await.unwrap(); client.restore_login(session.clone()).await.unwrap();
client.sync(sync_settings).await.unwrap(); client.sync_once(sync_settings).await.unwrap();
let base_client = &client.base_client; let base_client = &client.base_client;
@ -2622,7 +2658,7 @@ mod test {
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let response = client.sync(sync_settings).await.unwrap(); let response = client.sync_once(sync_settings).await.unwrap();
assert_ne!(response.next_batch, ""); assert_ne!(response.next_batch, "");
@ -2644,7 +2680,7 @@ mod test {
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync(sync_settings).await.unwrap(); let _response = client.sync_once(sync_settings).await.unwrap();
let mut names = vec![]; let mut names = vec![];
for r in client.joined_rooms().read().await.values() { for r in client.joined_rooms().read().await.values() {