async_client: Add a method to upload encryption keys.

master
Damir Jelić 2020-03-11 11:42:59 +01:00
parent e7d6a5a834
commit bc4416e7cd
3 changed files with 70 additions and 22 deletions

View File

@ -17,7 +17,7 @@ encryption = ["olm-rs", "serde/derive", "serde_json", "cjson"]
[dependencies] [dependencies]
js_int = "0.1.2" js_int = "0.1.2"
futures = "0.3.4" futures = "0.3.4"
reqwest = "0.10.2" reqwest = "0.10.4"
http = "0.2.0" http = "0.2.0"
async-std = "1.5.0" async-std = "1.5.0"
ruma-api = "0.13.0" ruma-api = "0.13.0"
@ -32,9 +32,10 @@ olm-rs = { git = "https://gitlab.gnome.org/jhaye/olm-rs/", optional = true, feat
serde = { version = "1.0.104", optional = true, features = ["derive"] } serde = { version = "1.0.104", optional = true, features = ["derive"] }
serde_json = { version = "1.0.48", optional = true } serde_json = { version = "1.0.48", optional = true }
cjson = { version = "0.1.0", optional = true } cjson = { version = "0.1.0", optional = true }
tokio = { version = "0.2.13", default-features = false, features = ["sync"] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "0.2.11", features = ["full"] } tokio = { version = "0.2.13", features = ["full"] }
async-std = { version = "1.5.0", features = ["unstable", "attributes"] } async-std = { version = "1.5.0", features = ["unstable", "attributes"] }
url = "2.1.1" url = "2.1.1"
mockito = "0.23.3" mockito = "0.23.3"

View File

@ -16,8 +16,9 @@
use futures::future::{BoxFuture, Future, FutureExt}; use futures::future::{BoxFuture, Future, FutureExt};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock as SyncLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use async_std::task::sleep; use async_std::task::sleep;
@ -41,8 +42,9 @@ use crate::error::{Error, InnerError};
use crate::session::Session; use crate::session::Session;
use crate::VERSION; use crate::VERSION;
type RoomEventCallback = type RoomEventCallback = Box<
Box<dyn FnMut(Arc<RwLock<Room>>, Arc<EventResult<RoomEvent>>) -> BoxFuture<'static, ()> + Send>; dyn FnMut(Arc<SyncLock<Room>>, Arc<EventResult<RoomEvent>>) -> BoxFuture<'static, ()> + Send,
>;
const DEFAULT_SYNC_TIMEOUT: u64 = 30000; const DEFAULT_SYNC_TIMEOUT: u64 = 30000;
@ -174,6 +176,8 @@ impl SyncSettings {
} }
} }
#[cfg(feature = "encryption")]
use api::r0::keys::upload_keys;
use api::r0::message::create_message_event; use api::r0::message::create_message_event;
use api::r0::session::login; use api::r0::session::login;
use api::r0::sync::sync_events; use api::r0::sync::sync_events;
@ -246,8 +250,10 @@ impl AsyncClient {
} }
/// Is the client logged in. /// Is the client logged in.
pub fn logged_in(&self) -> bool { pub async fn logged_in(&self) -> bool {
self.base_client.read().unwrap().logged_in() // TODO turn this into a atomic bool so this method doesn't need to be
// async.
self.base_client.read().await.logged_in()
} }
/// The Homeserver of the client. /// The Homeserver of the client.
@ -307,7 +313,7 @@ impl AsyncClient {
/// ``` /// ```
pub fn add_event_callback<C: 'static>( pub fn add_event_callback<C: 'static>(
&mut self, &mut self,
mut callback: impl FnMut(Arc<RwLock<Room>>, Arc<EventResult<RoomEvent>>) -> C + 'static + Send, mut callback: impl FnMut(Arc<SyncLock<Room>>, Arc<EventResult<RoomEvent>>) -> C + 'static + Send,
) where ) where
C: Future<Output = ()> + Send, C: Future<Output = ()> + Send,
{ {
@ -346,8 +352,8 @@ impl AsyncClient {
}; };
let response = self.send(request).await?; let response = self.send(request).await?;
let mut client = self.base_client.write().unwrap(); let mut client = self.base_client.write().await;
client.receive_login_response(&response); client.receive_login_response(&response).await;
Ok(response) Ok(response)
} }
@ -375,7 +381,7 @@ impl AsyncClient {
let room_id = room_id.to_string(); let room_id = room_id.to_string();
let matrix_room = { let matrix_room = {
let mut client = self.base_client.write().unwrap(); let mut client = self.base_client.write().await;
for event in &room.state.events { for event in &room.state.events {
if let EventResult::Ok(e) = event { if let EventResult::Ok(e) = event {
@ -388,7 +394,7 @@ impl AsyncClient {
for event in &room.timeline.events { for event in &room.timeline.events {
{ {
let mut client = self.base_client.write().unwrap(); let mut client = self.base_client.write().await;
client.receive_joined_timeline_event(&room_id, &event); client.receive_joined_timeline_event(&room_id, &event);
} }
@ -409,11 +415,11 @@ impl AsyncClient {
cb.await; cb.await;
} }
} }
let mut client = self.base_client.write().unwrap();
client.receive_sync_response(&response);
} }
let mut client = self.base_client.write().await;
client.receive_sync_response(&response);
Ok(response) Ok(response)
} }
@ -513,8 +519,12 @@ impl AsyncClient {
sync_settings = SyncSettings::new() sync_settings = SyncSettings::new()
.timeout(DEFAULT_SYNC_TIMEOUT) .timeout(DEFAULT_SYNC_TIMEOUT)
.unwrap() .expect("Default sync timeout doesn't contain a valid value")
.token(self.sync_token().unwrap()); .token(
self.sync_token()
.await
.expect("No sync token found after initial sync"),
);
} }
} }
@ -550,7 +560,7 @@ impl AsyncClient {
}; };
let request_builder = if Request::METADATA.requires_authentication { let request_builder = if Request::METADATA.requires_authentication {
let client = self.base_client.read().unwrap(); let client = self.base_client.read().await;
if let Some(ref session) = client.session { if let Some(ref session) = client.session {
request_builder.bearer_auth(&session.access_token) request_builder.bearer_auth(&session.access_token)
@ -601,9 +611,41 @@ impl AsyncClient {
Ok(response) Ok(response)
} }
/// Upload the E2E encryption keys.
///
/// This uploads the long lived device keys as well as the required amount
/// of one-time keys.
///
/// # Panics
///
/// Panics if the client isn't logged in, or if no encryption keys need to
/// be uploaded.
#[cfg(feature = "encryption")]
async fn keys_upload(&self) -> Result<upload_keys::Response, Error> {
let (device_keys, one_time_keys) = self
.base_client
.read()
.await
.keys_for_upload()
.await
.expect("Keys don't need to be uploaded");
let request = upload_keys::Request {
device_keys,
one_time_keys,
};
let response = self.send(request).await?;
self.base_client
.write()
.await
.receive_keys_upload_response(&response)
.await;
Ok(response)
}
/// Get the current, if any, sync token of the client. /// Get the current, if any, sync token of the client.
/// This will be None if the client didn't sync at least once. /// This will be None if the client didn't sync at least once.
pub fn sync_token(&self) -> Option<String> { pub async fn sync_token(&self) -> Option<String> {
self.base_client.read().unwrap().sync_token.clone() self.base_client.read().await.sync_token.clone()
} }
} }

View File

@ -24,7 +24,8 @@ fn login() {
rt.block_on(client.login("example", "wordpass", None)) rt.block_on(client.login("example", "wordpass", None))
.unwrap(); .unwrap();
assert!(client.logged_in(), "Clint should be logged in"); let logged_in = rt.block_on(client.logged_in());
assert!(logged_in, "Clint should be logged in");
} }
#[test] #[test]
@ -51,5 +52,9 @@ fn sync() {
let sync_settings = SyncSettings::new().timeout(3000).unwrap(); let sync_settings = SyncSettings::new().timeout(3000).unwrap();
rt.block_on(client.sync(sync_settings)).unwrap(); let response = rt.block_on(client.sync(sync_settings)).unwrap();
assert_ne!(response.next_batch, "");
assert!(rt.block_on(client.sync_token()).is_some());
} }