Merge branch 'master' into messages

This commit is contained in:
Devin R 2020-05-06 06:53:42 -04:00
commit f13189d154
9 changed files with 181 additions and 156 deletions

View file

@ -28,7 +28,7 @@ use futures::future::Future;
use tokio::sync::RwLock;
use tokio::time::delay_for as sleep;
#[cfg(feature = "encryption")]
use tracing::debug;
use tracing::{debug, warn};
use tracing::{info, instrument, trace};
use http::Method as HttpMethod;
@ -37,7 +37,7 @@ use reqwest::header::{HeaderValue, InvalidHeaderValue};
use url::Url;
use crate::events::room::message::MessageEventContent;
use crate::events::{EventJson, EventType};
use crate::events::EventType;
use crate::identifiers::{RoomId, RoomIdOrAliasId, UserId};
use crate::Endpoint;
@ -212,8 +212,6 @@ impl SyncSettings {
}
}
#[cfg(feature = "encryption")]
use api::r0::client_exchange::send_event_to_device;
#[cfg(feature = "encryption")]
use api::r0::keys::{claim_keys, get_keys, upload_keys, KeyAlgorithm};
use api::r0::membership::join_room_by_id;
@ -229,6 +227,8 @@ use api::r0::message::get_message_events;
use api::r0::room::create_room;
use api::r0::session::login;
use api::r0::sync::sync_events;
#[cfg(feature = "encryption")]
use api::r0::to_device::send_event_to_device;
impl AsyncClient {
/// Creates a new client for making HTTP requests to the given homeserver.
@ -437,11 +437,11 @@ impl AsyncClient {
pub async fn join_room_by_id_or_alias(
&self,
alias: &RoomIdOrAliasId,
server_name: &str,
server_names: &[String],
) -> Result<join_room_by_id_or_alias::Response> {
let request = join_room_by_id_or_alias::Request {
room_id_or_alias: alias.clone(),
server_name: server_name.to_owned(),
server_name: server_names.to_owned(),
third_party_signed: None,
};
self.send(request).await
@ -694,14 +694,16 @@ impl AsyncClient {
}
// look at AccountData to further cut down users by collecting ignored users
for account_data in &mut room.account_data.events {
{
if let Ok(e) = account_data.deserialize() {
let mut client = self.base_client.write().await;
if client.receive_account_data_event(&room_id, &e).await {
updated = true;
if let Some(account_data) = &room.account_data {
for account_data in &account_data.events {
{
if let Ok(e) = account_data.deserialize() {
let mut client = self.base_client.write().await;
if client.receive_account_data_event(&room_id, &e).await {
updated = true;
}
client.emit_account_data_event(room_id, &e).await;
}
client.emit_account_data_event(room_id, &e).await;
}
}
}
@ -833,11 +835,19 @@ impl AsyncClient {
#[cfg(feature = "encryption")]
{
if self.base_client.read().await.should_upload_keys().await {
let _ = self.keys_upload().await;
let response = self.keys_upload().await;
if let Err(e) = response {
warn!("Error while uploading E2EE keys {:?}", e);
}
}
if self.base_client.read().await.should_query_keys().await {
let _ = self.keys_query().await;
let response = self.keys_query().await;
if let Err(e) = response {
warn!("Error while querying device keys {:?}", e);
}
}
}
@ -972,11 +982,13 @@ impl AsyncClient {
pub async fn room_send(
&self,
room_id: &RoomId,
#[allow(unused_mut)] mut content: MessageEventContent,
content: MessageEventContent,
txn_id: Option<Uuid>,
) -> Result<create_message_event::Response> {
#[allow(unused_mut)]
let mut event_type = EventType::RoomMessage;
#[allow(unused_mut)]
let mut raw_content = serde_json::value::to_raw_value(&content)?;
#[cfg(feature = "encryption")]
{
@ -1019,12 +1031,14 @@ impl AsyncClient {
self.share_group_session(room_id).await?;
}
content = self
.base_client
.read()
.await
.encrypt(room_id, content)
.await?;
raw_content = serde_json::value::to_raw_value(
&self
.base_client
.read()
.await
.encrypt(room_id, content)
.await?,
)?;
event_type = EventType::RoomEncrypted;
}
}
@ -1033,7 +1047,7 @@ impl AsyncClient {
room_id: room_id.clone(),
event_type,
txn_id: txn_id.unwrap_or_else(Uuid::new_v4).to_string(),
data: EventJson::from(content),
data: raw_content,
};
let response = self.send(request).await?;

View file

@ -41,15 +41,15 @@ use crate::EventEmitter;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
#[cfg(feature = "encryption")]
use crate::api::r0::client_exchange::send_event_to_device;
#[cfg(feature = "encryption")]
use crate::api::r0::keys::{
claim_keys::Response as KeysClaimResponse, get_keys::Response as KeysQueryResponse,
upload_keys::Response as KeysUploadResponse, DeviceKeys, KeyAlgorithm,
};
#[cfg(feature = "encryption")]
use crate::events::room::message::MessageEventContent;
use crate::api::r0::to_device::send_event_to_device;
#[cfg(feature = "encryption")]
use crate::events::room::{encrypted::EncryptedEventContent, message::MessageEventContent};
#[cfg(feature = "encryption")]
use crate::identifiers::DeviceId;
#[cfg(feature = "encryption")]
@ -532,18 +532,13 @@ impl Client {
&self,
room_id: &RoomId,
content: MessageEventContent,
) -> Result<MessageEventContent> {
) -> Result<EncryptedEventContent> {
let mut olm = self.olm.lock().await;
// TODO enable this again once we can send encrypted event
// contents with ruma.
// match &mut *olm {
// Some(o) => Ok(MessageEventContent::Encrypted(
// o.encrypt(room_id, content).await?,
// )),
// None => panic!("Olm machine wasn't started"),
// }
Ok(content)
match &mut *olm {
Some(o) => Ok(o.encrypt(room_id, content).await?),
None => panic!("Olm machine wasn't started"),
}
}
/// Get a tuple of device and one-time keys that need to be uploaded.

View file

@ -118,7 +118,6 @@ impl RoomMember {
None => false,
// we ignore the error here as only a buggy or malicious server would send this
Error => false,
_ => false,
}
}

View file

@ -20,39 +20,39 @@ async-trait = "0.1.30"
matrix-sdk-types = { path = "../matrix_sdk_types" }
olm-rs = { version = "0.5.0", features = ["serde"]}
olm-rs = { version = "0.5.0", features = ["serde"] }
serde = { version = "1.0.106", features = ["derive"] }
serde_json = { version = "1.0.51" }
cjson = { version = "0.1.0" }
serde_json = "1.0.52"
cjson = "0.1.0"
zeroize = { version = "1.1.0", features = ["zeroize_derive"] }
uuid = { version = "0.8.1", features = ["v4"] }
url = "2.1.1"
# Misc dependencies
thiserror = "1.0.14"
thiserror = "1.0.16"
tracing = "0.1.13"
atomic = "0.4.5"
dashmap = "3.10.0"
dashmap = "3.11.1"
[dependencies.tracing-futures]
version = "0.2.3"
version = "0.2.4"
default-features = false
features = ["std", "std-future"]
[dependencies.tokio]
version = "0.2.16"
version = "0.2.20"
default-features = false
features = ["sync", "time"]
[dependencies.sqlx]
version = "0.3.3"
version = "0.3.4"
optional = true
default-features = false
features = ["runtime-tokio", "sqlite"]
[dev-dependencies]
tokio = { version = "0.2.16", features = ["rt-threaded", "macros"] }
ruma-identifiers = { version = "0.16.0", features = ["rand"] }
serde_json = "1.0.51"
tokio = { version = "0.2.20", features = ["rt-threaded", "macros"] }
ruma-identifiers = { version = "0.16.1", features = ["rand"] }
serde_json = "1.0.52"
tempfile = "3.1.0"
http = "*"
http = "0.2.1"

View file

@ -138,7 +138,8 @@ impl Device {
device_keys
.unsigned
.as_ref()
.map(|d| d.device_display_name.clone()),
.map(|d| d.device_display_name.clone())
.flatten(),
);
let _ = mem::replace(
@ -202,7 +203,8 @@ impl From<&DeviceKeys> for Device {
device_keys
.unsigned
.as_ref()
.map(|d| d.device_display_name.clone()),
.map(|d| d.device_display_name.clone())
.flatten(),
),
deleted: Arc::new(AtomicBool::new(false)),
trust_state: Arc::new(Atomic::new(TrustState::Unset)),
@ -294,7 +296,7 @@ pub(crate) mod test {
let mut device_keys = device_keys();
device_keys.unsigned.as_mut().unwrap().device_display_name =
"Alice's work computer".to_owned();
Some("Alice's work computer".to_owned());
device.update_device(&device_keys);
assert_eq!(

View file

@ -17,6 +17,7 @@
#![deny(
missing_debug_implementations,
dead_code,
missing_docs,
trivial_casts,
trivial_numeric_casts,

View file

@ -49,12 +49,11 @@ use matrix_sdk_types::identifiers::{DeviceId, RoomId, UserId};
use api::r0::keys;
use api::r0::{
client_exchange::{send_event_to_device::Request as ToDeviceRequest, DeviceIdOrAllDevices},
keys::{AlgorithmAndDeviceId, DeviceKeys, KeyAlgorithm, OneTimeKey, SignedKey},
sync::sync_events::Response as SyncResponse,
to_device::{send_event_to_device::Request as ToDeviceRequest, DeviceIdOrAllDevices},
};
use cjson;
use serde_json::{json, Value};
use tracing::{debug, error, info, instrument, trace, warn};
@ -985,7 +984,7 @@ impl OlmMachine {
.map_err(|_| EventError::UnsupportedOlmType)?;
// Decrypt the OlmMessage and get a Ruma event out of it.
let (mut decrypted_event, signing_key) = self
let (decrypted_event, signing_key) = self
.decrypt_olm_message(&event.sender, &content.sender_key, message)
.await?;
@ -993,14 +992,23 @@ impl OlmMachine {
// Handle the decrypted event, e.g. fetch out Megolm sessions out of
// the event.
self.handle_decrypted_to_device_event(
&content.sender_key,
&signing_key,
&mut decrypted_event,
)
.await?;
Ok(decrypted_event)
if let Some(event) = self
.handle_decrypted_to_device_event(
&content.sender_key,
&signing_key,
&decrypted_event,
)
.await?
{
// Some events may have sensitive data e.g. private keys, while we
// wan't to notify our users that a private key was received we
// don't want them to be able to do silly things with it. Handling
// events modifies them and returns a modified one, so replace it
// here if we get one.
Ok(event)
} else {
Ok(decrypted_event)
}
} else {
warn!("Olm event doesn't contain a ciphertext for our key");
Err(EventError::MissingCiphertext.into())
@ -1013,7 +1021,7 @@ impl OlmMachine {
sender_key: &str,
signing_key: &str,
event: &mut ToDeviceRoomKey,
) -> OlmResult<()> {
) -> OlmResult<Option<EventJson<ToDeviceEvent>>> {
match event.content.algorithm {
Algorithm::MegolmV1AesSha2 => {
let session_key = GroupSessionKey(mem::take(&mut event.content.session_key));
@ -1025,14 +1033,24 @@ impl OlmMachine {
session_key,
)?;
let _ = self.store.save_inbound_group_session(session).await?;
Ok(())
// TODO ideally we would rewrap the event again just like so
// let event = EventJson::from(ToDeviceEvent::RoomKey(event.clone()));
// This saidly lacks a type once it's serialized again, fix
// this in Ruma.
let mut json = serde_json::to_value(event.clone())?;
json.as_object_mut()
.unwrap()
.insert("type".to_owned(), Value::String("m.room_key".to_owned()));
let event = serde_json::from_value::<EventJson<ToDeviceEvent>>(json)?;
Ok(Some(event))
}
_ => {
warn!(
"Received room key with unsupported key algorithm {}",
event.content.algorithm
);
Ok(())
Ok(None)
}
}
}
@ -1292,10 +1310,10 @@ impl OlmMachine {
// TODO enable this again once we can send encrypted event
// contents with ruma.
// user_messages.insert(
// DeviceIdOrAllDevices::DeviceId(device.device_id().clone()),
// EventJson::from(MessageEventContent::Encrypted(encrypted_content)),
// );
user_messages.insert(
DeviceIdOrAllDevices::DeviceId(device.device_id().clone()),
serde_json::value::to_raw_value(&encrypted_content)?,
);
}
message_vec.push(ToDeviceRequest {
@ -1331,25 +1349,26 @@ impl OlmMachine {
&mut self,
sender_key: &str,
signing_key: &str,
event: &mut EventJson<ToDeviceEvent>,
) -> OlmResult<()> {
event: &EventJson<ToDeviceEvent>,
) -> OlmResult<Option<EventJson<ToDeviceEvent>>> {
let event = if let Ok(e) = event.deserialize() {
e
} else {
warn!("Decrypted to-device event failed to be parsed correctly");
return Ok(());
return Ok(None);
};
match event {
ToDeviceEvent::RoomKey(mut e) => {
self.add_room_key(sender_key, signing_key, &mut e).await
Ok(self.add_room_key(sender_key, signing_key, &mut e).await?)
}
ToDeviceEvent::ForwardedRoomKey(e) => {
self.add_forwarded_room_key(sender_key, signing_key, &e)
self.add_forwarded_room_key(sender_key, signing_key, &e)?;
Ok(None)
}
_ => {
warn!("Received a unexpected encrypted to-device event");
Ok(())
Ok(None)
}
}
}
@ -1546,7 +1565,7 @@ mod test {
use crate::Device;
use matrix_sdk_types::api::r0::{
client_exchange::send_event_to_device::Request as ToDeviceRequest, keys,
keys, to_device::send_event_to_device::Request as ToDeviceRequest,
};
use matrix_sdk_types::events::{
collections::all::RoomEvent,
@ -1603,7 +1622,6 @@ mod test {
.values()
.next()
.unwrap()
.json()
.get(),
)
.unwrap();
@ -1991,98 +2009,95 @@ mod test {
}
}
// TODO this is disabled so CI passes, we can't enable this until ruma gets
// the ability back to send encrypted content.
// #[tokio::test]
// async fn test_room_key_sharing() {
// let (mut alice, mut bob) = get_machine_pair_with_session().await;
#[tokio::test]
async fn test_room_key_sharing() {
let (mut alice, mut bob) = get_machine_pair_with_session().await;
// let room_id = RoomId::try_from("!test:example.org").unwrap();
let room_id = RoomId::try_from("!test:example.org").unwrap();
// let to_device_requests = alice
// .share_group_session(&room_id, [bob.user_id.clone()].iter())
// .await
// .unwrap();
let to_device_requests = alice
.share_group_session(&room_id, [bob.user_id.clone()].iter())
.await
.unwrap();
// let event = ToDeviceEncrypted {
// sender: alice.user_id.clone(),
// content: to_device_requests_to_content(to_device_requests),
// };
let event = ToDeviceEncrypted {
sender: alice.user_id.clone(),
content: to_device_requests_to_content(to_device_requests),
};
// let alice_session = alice.outbound_group_sessions.get(&room_id).unwrap();
let alice_session = alice.outbound_group_sessions.get(&room_id).unwrap();
// let event = bob.decrypt_to_device_event(&event).await.unwrap();
let event = bob.decrypt_to_device_event(&event).await.unwrap();
// if let AnyToDeviceEvent::RoomKey(e) = event.deserialize().unwrap() {
// assert_eq!(e.sender, alice.user_id);
// } else {
// panic!("Event had the wrong type");
// }
if let AnyToDeviceEvent::RoomKey(e) = event.deserialize().unwrap() {
assert_eq!(e.sender, alice.user_id);
assert!(e.content.session_key.is_empty())
} else {
panic!("Event had the wrong type");
}
// let session = bob
// .store
// .get_inbound_group_session(
// &room_id,
// alice.account.identity_keys().curve25519(),
// alice_session.session_id(),
// )
// .await;
let session = bob
.store
.get_inbound_group_session(
&room_id,
alice.account.identity_keys().curve25519(),
alice_session.session_id(),
)
.await;
// assert!(session.unwrap().is_some());
// }
assert!(session.unwrap().is_some());
}
// TODO this is disabled so CI passes, we can't enable this until ruma gets
// the ability back to send encrypted content.
// #[tokio::test]
// async fn test_megolm_encryption() {
// let (mut alice, mut bob) = get_machine_pair_with_setup_sessions().await;
// let room_id = RoomId::try_from("!test:example.org").unwrap();
#[tokio::test]
async fn test_megolm_encryption() {
let (mut alice, mut bob) = get_machine_pair_with_setup_sessions().await;
let room_id = RoomId::try_from("!test:example.org").unwrap();
// let to_device_requests = alice
// .share_group_session(&room_id, [bob.user_id().clone()].iter())
// .await
// .unwrap();
let to_device_requests = alice
.share_group_session(&room_id, [bob.user_id().clone()].iter())
.await
.unwrap();
// let event = ToDeviceEncrypted {
// sender: alice.user_id().clone(),
// content: to_device_requests_to_content(to_device_requests),
// };
let event = ToDeviceEncrypted {
sender: alice.user_id().clone(),
content: to_device_requests_to_content(to_device_requests),
};
// bob.decrypt_to_device_event(&event).await.unwrap();
bob.decrypt_to_device_event(&event).await.unwrap();
// let plaintext = "It is a secret to everybody";
let plaintext = "It is a secret to everybody";
// let content = MessageEventContent::Text(TextMessageEventContent::new_plain(plaintext));
let content = MessageEventContent::Text(TextMessageEventContent::new_plain(plaintext));
// let encrypted_content = alice.encrypt(&room_id, content.clone()).await.unwrap();
let encrypted_content = alice.encrypt(&room_id, content.clone()).await.unwrap();
// let event = EncryptedEvent {
// event_id: EventId::new("example.org").unwrap(),
// origin_server_ts: SystemTime::now(),
// room_id: Some(room_id.clone()),
// sender: alice.user_id().clone(),
// content: encrypted_content,
// unsigned: UnsignedData::default(),
// };
let event = EncryptedEvent {
event_id: EventId::new("example.org").unwrap(),
origin_server_ts: SystemTime::now(),
room_id: Some(room_id.clone()),
sender: alice.user_id().clone(),
content: encrypted_content,
unsigned: UnsignedData::default(),
};
// let decrypted_event = bob
// .decrypt_room_event(&event)
// .await
// .unwrap()
// .deserialize()
// .unwrap();
let decrypted_event = bob
.decrypt_room_event(&event)
.await
.unwrap()
.deserialize()
.unwrap();
// let decrypted_event = match decrypted_event {
// RoomEvent::RoomMessage(e) => e,
// _ => panic!("Decrypted room event has the wrong type"),
// };
let decrypted_event = match decrypted_event {
RoomEvent::RoomMessage(e) => e,
_ => panic!("Decrypted room event has the wrong type"),
};
// assert_eq!(&decrypted_event.sender, alice.user_id());
// assert_eq!(&decrypted_event.room_id, &Some(room_id));
// if let MessageEventContent::Text(c) = &decrypted_event.content {
// assert_eq!(&c.body, plaintext);
// } else {
// panic!("Decrypted event has a missmatched content");
// }
// }
assert_eq!(&decrypted_event.sender, alice.user_id());
assert_eq!(&decrypted_event.room_id, &Some(room_id));
if let MessageEventContent::Text(c) = &decrypted_event.content {
assert_eq!(&c.body, plaintext);
} else {
panic!("Decrypted event has a missmatched content");
}
}
}

View file

@ -23,7 +23,6 @@ use url::Url;
use async_trait::async_trait;
use olm_rs::PicklingMode;
use serde_json;
use sqlx::{query, query_as, sqlite::SqliteQueryAs, Connect, Executor, SqliteConnection};
use tokio::sync::Mutex;
use zeroize::Zeroizing;

View file

@ -12,7 +12,7 @@ version = "0.1.0"
[dependencies]
js_int = "0.1.5"
ruma-api = "0.16.0-rc.3"
ruma-client-api = "0.8.0-rc.5"
ruma-api = "0.16.0"
ruma-client-api = "0.8.0"
ruma-events = "0.21.0"
ruma-identifiers = "0.16.1"