Merge branch 'exhaustive-sync-events-conv'
commit
65d84c111b
|
@ -682,11 +682,35 @@ impl BaseClient {
|
|||
&self,
|
||||
response: api::sync::sync_events::Response,
|
||||
) -> Result<SyncResponse> {
|
||||
#[cfg(test)]
|
||||
let api::sync::sync_events::Response {
|
||||
next_batch,
|
||||
rooms,
|
||||
presence,
|
||||
account_data,
|
||||
to_device,
|
||||
device_lists,
|
||||
device_one_time_keys_count,
|
||||
__test_exhaustive: _,
|
||||
} = response;
|
||||
|
||||
#[cfg(not(test))]
|
||||
let api::sync::sync_events::Response {
|
||||
next_batch,
|
||||
rooms,
|
||||
presence,
|
||||
account_data,
|
||||
to_device,
|
||||
device_lists,
|
||||
device_one_time_keys_count,
|
||||
..
|
||||
} = response;
|
||||
|
||||
// The server might respond multiple times with the same sync token, in
|
||||
// that case we already received this response and there's nothing to
|
||||
// do.
|
||||
if self.sync_token.read().await.as_ref() == Some(&response.next_batch) {
|
||||
return Ok(SyncResponse::new(response.next_batch));
|
||||
if self.sync_token.read().await.as_ref() == Some(&next_batch) {
|
||||
return Ok(SyncResponse::new(next_batch));
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
|
@ -700,15 +724,10 @@ impl BaseClient {
|
|||
// decryptes to-device events, but leaves room events alone.
|
||||
// This makes sure that we have the deryption keys for the room
|
||||
// events at hand.
|
||||
o.receive_sync_changes(
|
||||
&response.to_device,
|
||||
&response.device_lists,
|
||||
&response.device_one_time_keys_count,
|
||||
)
|
||||
.await?
|
||||
o.receive_sync_changes(&to_device, &device_lists, &device_one_time_keys_count)
|
||||
.await?
|
||||
} else {
|
||||
response
|
||||
.to_device
|
||||
to_device
|
||||
.events
|
||||
.into_iter()
|
||||
.filter_map(|e| e.deserialize().ok())
|
||||
|
@ -717,20 +736,19 @@ impl BaseClient {
|
|||
}
|
||||
};
|
||||
#[cfg(not(feature = "encryption"))]
|
||||
let to_device = response
|
||||
.to_device
|
||||
let to_device = to_device
|
||||
.events
|
||||
.into_iter()
|
||||
.filter_map(|e| e.deserialize().ok())
|
||||
.collect::<Vec<AnyToDeviceEvent>>()
|
||||
.into();
|
||||
|
||||
let mut changes = StateChanges::new(response.next_batch.clone());
|
||||
let mut changes = StateChanges::new(next_batch.clone());
|
||||
let mut ambiguity_cache = AmbiguityCache::new(self.store.clone());
|
||||
|
||||
let mut rooms = Rooms::default();
|
||||
let mut new_rooms = Rooms::default();
|
||||
|
||||
for (room_id, new_info) in response.rooms.join {
|
||||
for (room_id, new_info) in rooms.join {
|
||||
let room = self
|
||||
.store
|
||||
.get_or_create_room(&room_id, RoomType::Joined)
|
||||
|
@ -799,7 +817,7 @@ impl BaseClient {
|
|||
.collect(),
|
||||
};
|
||||
|
||||
rooms.join.insert(
|
||||
new_rooms.join.insert(
|
||||
room_id,
|
||||
JoinedRoom::new(timeline, state, account_data, ephemeral, notification_count),
|
||||
);
|
||||
|
@ -807,7 +825,7 @@ impl BaseClient {
|
|||
changes.add_room(room_info);
|
||||
}
|
||||
|
||||
for (room_id, new_info) in response.rooms.leave {
|
||||
for (room_id, new_info) in rooms.leave {
|
||||
let room = self
|
||||
.store
|
||||
.get_or_create_room(&room_id, RoomType::Left)
|
||||
|
@ -840,12 +858,12 @@ impl BaseClient {
|
|||
.await;
|
||||
|
||||
changes.add_room(room_info);
|
||||
rooms
|
||||
new_rooms
|
||||
.leave
|
||||
.insert(room_id, LeftRoom::new(timeline, state, account_data));
|
||||
}
|
||||
|
||||
for (room_id, new_info) in response.rooms.invite {
|
||||
for (room_id, new_info) in rooms.invite {
|
||||
{
|
||||
let room = self
|
||||
.store
|
||||
|
@ -870,11 +888,10 @@ impl BaseClient {
|
|||
invite_state: state,
|
||||
};
|
||||
|
||||
rooms.invite.insert(room_id, room);
|
||||
new_rooms.invite.insert(room_id, room);
|
||||
}
|
||||
|
||||
let presence: BTreeMap<UserId, PresenceEvent> = response
|
||||
.presence
|
||||
let presence: BTreeMap<UserId, PresenceEvent> = presence
|
||||
.events
|
||||
.into_iter()
|
||||
.filter_map(|e| {
|
||||
|
@ -885,20 +902,20 @@ impl BaseClient {
|
|||
|
||||
changes.presence = presence;
|
||||
|
||||
self.handle_account_data(response.account_data.events, &mut changes)
|
||||
self.handle_account_data(account_data.events, &mut changes)
|
||||
.await;
|
||||
|
||||
changes.ambiguity_maps = ambiguity_cache.cache;
|
||||
|
||||
self.store.save_changes(&changes).await?;
|
||||
*self.sync_token.write().await = Some(response.next_batch.clone());
|
||||
*self.sync_token.write().await = Some(next_batch.clone());
|
||||
self.apply_changes(&changes).await;
|
||||
|
||||
info!("Processed a sync response in {:?}", now.elapsed());
|
||||
|
||||
let response = SyncResponse {
|
||||
next_batch: response.next_batch,
|
||||
rooms,
|
||||
next_batch,
|
||||
rooms: new_rooms,
|
||||
presence: Presence {
|
||||
events: changes.presence.into_iter().map(|(_, v)| v).collect(),
|
||||
},
|
||||
|
@ -906,9 +923,8 @@ impl BaseClient {
|
|||
events: changes.account_data.into_iter().map(|(_, e)| e).collect(),
|
||||
},
|
||||
to_device,
|
||||
device_lists: response.device_lists,
|
||||
device_one_time_keys_count: response
|
||||
.device_one_time_keys_count
|
||||
device_lists,
|
||||
device_one_time_keys_count: device_one_time_keys_count
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.into()))
|
||||
.collect(),
|
||||
|
|
|
@ -20,10 +20,10 @@ serde = "1.0.122"
|
|||
async-trait = "0.1.42"
|
||||
|
||||
[dependencies.ruma]
|
||||
version = "0.0.2"
|
||||
version = "0.0.3"
|
||||
git = "https://github.com/ruma/ruma"
|
||||
rev = "47d6b458574247545f8836b9421800f0089f3008"
|
||||
features = ["client-api", "compat", "unstable-pre-spec"]
|
||||
rev = "c816630058ab625d93ebab294e9e6c02dd9d866c"
|
||||
features = ["client-api-c", "compat", "unstable-pre-spec"]
|
||||
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||
uuid = { version = "0.8.2", default-features = false, features = ["v4", "serde"] }
|
||||
|
|
Loading…
Reference in New Issue