crypto: Calculate the device changes for a given user in parallel

master
Damir Jelić 2021-03-10 12:05:21 +01:00
parent 0c5d13cb91
commit c8d4cd0a5b
2 changed files with 84 additions and 44 deletions

View File

@ -72,7 +72,8 @@ pub fn keys_query(c: &mut Criterion) {
); );
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let machine = block_on(OlmMachine::new_with_default_store( let machine = runtime
.block_on(OlmMachine::new_with_default_store(
&alice_id(), &alice_id(),
&alice_device_id(), &alice_device_id(),
dir.path(), dir.path(),
@ -169,8 +170,12 @@ pub fn room_key_sharing(c: &mut Criterion) {
.fold(0, |acc, d| acc + d.len()); .fold(0, |acc, d| acc + d.len());
let machine = OlmMachine::new(&alice_id(), &alice_device_id()); let machine = OlmMachine::new(&alice_id(), &alice_device_id());
block_on(machine.mark_request_as_sent(&uuid, &keys_query_response)).unwrap(); runtime
block_on(machine.mark_request_as_sent(&uuid, &response)).unwrap(); .block_on(machine.mark_request_as_sent(&uuid, &keys_query_response))
.unwrap();
runtime
.block_on(machine.mark_request_as_sent(&uuid, &response))
.unwrap();
let mut group = c.benchmark_group("Room key sharing"); let mut group = c.benchmark_group("Room key sharing");
group.throughput(Throughput::Elements(count as u64)); group.throughput(Throughput::Elements(count as u64));
@ -204,8 +209,12 @@ pub fn room_key_sharing(c: &mut Criterion) {
None, None,
)) ))
.unwrap(); .unwrap();
block_on(machine.mark_request_as_sent(&uuid, &keys_query_response)).unwrap(); runtime
block_on(machine.mark_request_as_sent(&uuid, &response)).unwrap(); .block_on(machine.mark_request_as_sent(&uuid, &keys_query_response))
.unwrap();
runtime
.block_on(machine.mark_request_as_sent(&uuid, &response))
.unwrap();
group.bench_function(BenchmarkId::new("sled store", &name), |b| { group.bench_function(BenchmarkId::new("sled store", &name), |b| {
b.to_async(&runtime).iter(|| async { b.to_async(&runtime).iter(|| async {

View File

@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use futures::future::join_all;
use std::{ use std::{
collections::{BTreeMap, HashSet}, collections::{BTreeMap, HashSet},
convert::TryFrom, convert::TryFrom,
sync::Arc, sync::Arc,
}; };
use tracing::{info, trace, warn}; use tracing::{trace, warn};
use matrix_sdk_common::{ use matrix_sdk_common::{
api::r0::keys::get_keys::Response as KeysQueryResponse, api::r0::keys::get_keys::Response as KeysQueryResponse,
@ -35,6 +36,12 @@ use crate::{
store::{Changes, DeviceChanges, IdentityChanges, Result as StoreResult, Store}, store::{Changes, DeviceChanges, IdentityChanges, Result as StoreResult, Store},
}; };
enum DeviceChange {
New(ReadOnlyDevice),
Updated(ReadOnlyDevice),
None,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct IdentityManager { pub(crate) struct IdentityManager {
user_id: Arc<UserId>, user_id: Arc<UserId>,
@ -90,6 +97,43 @@ impl IdentityManager {
Ok((changed_devices, changed_identities)) Ok((changed_devices, changed_identities))
} }
async fn update_or_create_device(
store: Store,
device_keys: DeviceKeys,
) -> StoreResult<DeviceChange> {
let old_device = store
.get_readonly_device(&device_keys.user_id, &device_keys.device_id)
.await?;
if let Some(mut device) = old_device {
if let Err(e) = device.update_device(&device_keys) {
warn!(
"Failed to update the device keys for {} {}: {:?}",
device.user_id(),
device.device_id(),
e
);
Ok(DeviceChange::None)
} else {
Ok(DeviceChange::Updated(device))
}
} else {
match ReadOnlyDevice::try_from(&device_keys) {
Ok(d) => {
trace!("Adding a new device to the device store {:?}", d);
Ok(DeviceChange::New(d))
}
Err(e) => {
warn!(
"Failed to create a new device for {} {}: {:?}",
device_keys.user_id, device_keys.device_id, e
);
Ok(DeviceChange::None)
}
}
}
}
/// Handle the device keys part of a key query response. /// Handle the device keys part of a key query response.
/// ///
/// # Arguments /// # Arguments
@ -107,48 +151,35 @@ impl IdentityManager {
for (user_id, device_map) in device_keys_map { for (user_id, device_map) in device_keys_map {
// TODO move this out into the handle keys query response method // TODO move this out into the handle keys query response method
// since we might fail handle the new device at any point here or // since we might fail to handle the new device at any point here or
// when updating the user identities. // when updating the user identities.
self.store.update_tracked_user(user_id, false).await?; self.store.update_tracked_user(user_id, false).await?;
for (device_id, device_keys) in device_map.iter() { let tasks = device_map.iter().filter_map(|(device_id, device_keys)| {
// We don't need our own device in the device store. // We don't need our own device in the device store.
if user_id == self.user_id() && &**device_id == self.device_id() { if user_id == self.user_id() && &**device_id == self.device_id() {
continue; None
} } else if user_id != &device_keys.user_id || device_id != &device_keys.device_id {
if user_id != &device_keys.user_id || device_id != &device_keys.device_id {
warn!( warn!(
"Mismatch in device keys payload of device {}|{} from user {}|{}", "Mismatch in device keys payload of device {}|{} from user {}|{}",
device_id, device_keys.device_id, user_id, device_keys.user_id device_id, device_keys.device_id, user_id, device_keys.user_id
); );
continue; None
}
let device = self.store.get_readonly_device(&user_id, device_id).await?;
if let Some(mut device) = device {
if let Err(e) = device.update_device(device_keys) {
warn!(
"Failed to update the device keys for {} {}: {:?}",
user_id, device_id, e
);
continue;
}
changes.changed.push(device);
} else { } else {
let device = match ReadOnlyDevice::try_from(device_keys) { Some(tokio::spawn(Self::update_or_create_device(
Ok(d) => d, self.store.clone(),
Err(e) => { device_keys.clone(),
warn!( )))
"Failed to create a new device for {} {}: {:?}",
user_id, device_id, e
);
continue;
} }
}; });
info!("Adding a new device to the device store {:?}", device);
changes.new.push(device); let results = join_all(tasks).await;
for device in results {
match device.expect("Creating or updating a device panicked")? {
DeviceChange::New(d) => changes.new.push(d),
DeviceChange::Updated(d) => changes.changed.push(d),
DeviceChange::None => (),
} }
} }