crypto: Store and restore outbound group sessions

This commit is contained in:
Damir Jelić 2021-01-25 17:14:13 +01:00
parent ac6dad3f35
commit 6cb2c8b468
9 changed files with 281 additions and 33 deletions

View file

@ -24,7 +24,7 @@ mod inbound;
mod outbound;
pub use inbound::{InboundGroupSession, InboundGroupSessionPickle, PickledInboundGroupSession};
pub use outbound::{EncryptionSettings, OutboundGroupSession};
pub use outbound::{EncryptionSettings, OutboundGroupSession, PickledOutboundGroupSession};
/// The private session key of a group session.
/// Can be used to create a new inbound group session.

View file

@ -23,6 +23,7 @@ use matrix_sdk_common::{
};
use std::{
cmp::max,
collections::{BTreeMap, BTreeSet},
fmt,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
@ -41,18 +42,24 @@ use matrix_sdk_common::{
instant::Instant,
locks::Mutex,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use olm_rs::outbound_group_session::OlmOutboundGroupSession;
pub use olm_rs::{
account::IdentityKeys,
session::{OlmMessage, PreKeyMessage},
utility::OlmUtility,
};
use olm_rs::{
errors::OlmGroupSessionError, outbound_group_session::OlmOutboundGroupSession, PicklingMode,
};
use crate::ToDeviceRequest;
use super::GroupSessionKey;
use super::{
super::{deserialize_instant, serialize_instant},
GroupSessionKey,
};
const ROTATION_PERIOD: Duration = Duration::from_millis(604800000);
const ROTATION_MESSAGES: u64 = 100;
@ -60,7 +67,7 @@ const ROTATION_MESSAGES: u64 = 100;
/// Settings for an encrypted room.
///
/// This determines the algorithm and rotation periods of a group session.
#[derive(Debug, Clone)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct EncryptionSettings {
/// The encryption algorithm that should be used in the room.
pub algorithm: EventEncryptionAlgorithm,
@ -158,7 +165,7 @@ impl OutboundGroupSession {
}
}
pub fn add_request(&self, request_id: Uuid, request: Arc<ToDeviceRequest>) {
pub(crate) fn add_request(&self, request_id: Uuid, request: Arc<ToDeviceRequest>) {
self.to_share_with_set.insert(request_id, request);
}
@ -383,6 +390,101 @@ impl OutboundGroupSession {
.map(|i| i.value().clone())
.collect()
}
/// Restore a Session from a previously pickled string.
///
/// Returns the restored group session or a `OlmGroupSessionError` if there
/// was an error.
///
/// # Arguments
///
/// * `device_id` - The device id of the device that created this session.
/// Put differently, our own device id.
///
/// * `identity_keys` - The identity keys of the device that created this
/// session, our own identity keys.
///
/// * `pickle` - The pickled version of the `OutboundGroupSession`.
///
/// * `pickle_mode` - The mode that was used to pickle the session, either
/// an unencrypted mode or an encrypted using passphrase.
pub fn from_pickle(
device_id: Arc<DeviceIdBox>,
identity_keys: Arc<IdentityKeys>,
pickle: PickledOutboundGroupSession,
pickling_mode: PicklingMode,
) -> Result<Self, OlmGroupSessionError> {
let inner = OlmOutboundGroupSession::unpickle(pickle.pickle.0, pickling_mode)?;
let session_id = inner.session_id();
Ok(Self {
inner: Arc::new(Mutex::new(inner)),
device_id,
account_identity_keys: identity_keys,
session_id: session_id.into(),
room_id: pickle.room_id,
creation_time: pickle.creation_time.into(),
message_count: AtomicU64::from(pickle.message_count).into(),
shared: AtomicBool::from(pickle.shared).into(),
invalidated: AtomicBool::from(pickle.invalidated).into(),
settings: pickle.settings,
shared_with_set: Arc::new(
pickle
.shared_with_set
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect()))
.collect(),
),
to_share_with_set: Arc::new(pickle.requests.into_iter().collect()),
})
}
/// Store the group session as a base64 encoded string and associated data
/// belonging to the session.
///
/// # Arguments
///
/// * `pickle_mode` - The mode that should be used to pickle the group session,
/// either an unencrypted mode or an encrypted using passphrase.
pub async fn pickle(&self, pickling_mode: PicklingMode) -> PickledOutboundGroupSession {
let pickle: OutboundGroupSessionPickle =
self.inner.lock().await.pickle(pickling_mode).into();
PickledOutboundGroupSession {
pickle,
room_id: self.room_id.clone(),
settings: self.settings.clone(),
creation_time: *self.creation_time,
message_count: self.message_count.load(Ordering::SeqCst),
shared: self.shared(),
invalidated: self.invalidated(),
shared_with_set: self
.shared_with_set
.iter()
.map(|u| {
(
u.key().clone(),
#[allow(clippy::map_clone)]
u.value().iter().map(|d| d.clone()).collect(),
)
})
.collect(),
requests: self
.to_share_with_set
.iter()
.map(|r| (*r.key(), r.value().clone()))
.collect(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OutboundGroupSessionPickle(String);
impl From<String> for OutboundGroupSessionPickle {
fn from(p: String) -> Self {
Self(p)
}
}
#[cfg(not(tarpaulin_include))]
@ -397,6 +499,36 @@ impl std::fmt::Debug for OutboundGroupSession {
}
}
/// A pickled version of an `InboundGroupSession`.
///
/// Holds all the information that needs to be stored in a database to restore
/// an InboundGroupSession.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PickledOutboundGroupSession {
/// The pickle string holding the OutboundGroupSession.
pub pickle: OutboundGroupSessionPickle,
/// The settings this session adheres to.
pub settings: Arc<EncryptionSettings>,
/// The room id this session is used for.
pub room_id: Arc<RoomId>,
/// The timestamp when this session was created.
#[serde(
deserialize_with = "deserialize_instant",
serialize_with = "serialize_instant"
)]
pub creation_time: Instant,
/// The number of messages this session has already encrypted.
pub message_count: u64,
/// Is the session shared.
pub shared: bool,
/// Has the session been invalidated.
pub invalidated: bool,
/// The set of users the session has been already shared with.
pub shared_with_set: BTreeMap<UserId, BTreeSet<DeviceIdBox>>,
/// Requests that need to be sent out to share the session.
pub requests: BTreeMap<Uuid, Arc<ToDeviceRequest>>,
}
#[cfg(test)]
mod test {
use std::time::Duration;

View file

@ -25,16 +25,39 @@ mod utility;
pub(crate) use account::{Account, OlmDecryptionInfo, SessionType};
pub use account::{AccountPickle, OlmMessageHash, PickledAccount, ReadOnlyAccount};
pub(crate) use group_sessions::GroupSessionKey;
pub use group_sessions::{
EncryptionSettings, ExportedRoomKey, InboundGroupSession, InboundGroupSessionPickle,
PickledInboundGroupSession,
OutboundGroupSession, PickledInboundGroupSession, PickledOutboundGroupSession,
};
pub(crate) use group_sessions::{GroupSessionKey, OutboundGroupSession};
pub use olm_rs::{account::IdentityKeys, PicklingMode};
pub use session::{PickledSession, Session, SessionPickle};
pub use signing::{PickledCrossSigningIdentity, PrivateCrossSigningIdentity};
pub(crate) use utility::Utility;
use matrix_sdk_common::instant::{Duration, Instant};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub(crate) fn serialize_instant<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let duration = instant.elapsed();
duration.serialize(serializer)
}
pub(crate) fn deserialize_instant<'de, D>(deserializer: D) -> Result<Instant, D::Error>
where
D: Deserializer<'de>,
{
let duration = Duration::deserialize(deserializer)?;
let now = Instant::now();
let instant = now
.checked_sub(duration)
.ok_or_else(|| serde::de::Error::custom("Can't substract the the current instant"))?;
Ok(instant)
}
#[cfg(test)]
pub(crate) mod test {
use crate::olm::{InboundGroupSession, ReadOnlyAccount, Session};

View file

@ -20,14 +20,13 @@ use matrix_sdk_common::{
EventType,
},
identifiers::{DeviceId, DeviceKeyAlgorithm, UserId},
instant::{Duration, Instant},
instant::Instant,
locks::Mutex,
};
use olm_rs::{errors::OlmSessionError, session::OlmSession, PicklingMode};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use super::IdentityKeys;
use crate::{
error::{EventError, OlmResult, SessionUnpicklingError},
ReadOnlyDevice,
@ -38,6 +37,8 @@ pub use olm_rs::{
utility::OlmUtility,
};
use super::{deserialize_instant, serialize_instant, IdentityKeys};
/// Cryptographic session that enables secure communication between two
/// `Account`s
#[derive(Clone)]
@ -187,9 +188,8 @@ impl Session {
PickledSession {
pickle: SessionPickle::from(pickle),
sender_key: self.sender_key.to_string(),
// FIXME this should use the duration from the unix epoch.
creation_time: self.creation_time.elapsed(),
last_use_time: self.last_use_time.elapsed(),
creation_time: *self.creation_time,
last_use_time: *self.last_use_time,
}
}
@ -220,16 +220,6 @@ impl Session {
let session = OlmSession::unpickle(pickle.pickle.0, pickle_mode)?;
let session_id = session.session_id();
// FIXME this should use the UNIX epoch.
let now = Instant::now();
let creation_time = now
.checked_sub(pickle.creation_time)
.ok_or(SessionUnpicklingError::SessionTimestampError)?;
let last_use_time = now
.checked_sub(pickle.last_use_time)
.ok_or(SessionUnpicklingError::SessionTimestampError)?;
Ok(Session {
user_id,
device_id,
@ -237,8 +227,8 @@ impl Session {
inner: Arc::new(Mutex::new(session)),
session_id: session_id.into(),
sender_key: pickle.sender_key.into(),
creation_time: Arc::new(creation_time),
last_use_time: Arc::new(last_use_time),
creation_time: Arc::new(pickle.creation_time),
last_use_time: Arc::new(pickle.last_use_time),
})
}
}
@ -260,9 +250,17 @@ pub struct PickledSession {
/// The curve25519 key of the other user that we share this session with.
pub sender_key: String,
/// The relative time elapsed since the session was created.
pub creation_time: Duration,
#[serde(
deserialize_with = "deserialize_instant",
serialize_with = "serialize_instant"
)]
pub creation_time: Instant,
/// The relative time elapsed since the session was last used.
pub last_use_time: Duration,
#[serde(
deserialize_with = "deserialize_instant",
serialize_with = "serialize_instant"
)]
pub last_use_time: Instant,
}
/// The typed representation of a base64 encoded string of the Olm Session pickle.

View file

@ -36,11 +36,12 @@ use matrix_sdk_common::{
uuid::Uuid,
};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue as RawJsonValue;
/// Customized version of `ruma_client_api::r0::to_device::send_event_to_device::Request`, using a
/// UUID for the transaction ID.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ToDeviceRequest {
/// Type of event being sent to each device.
pub event_type: EventType,

View file

@ -101,7 +101,13 @@ impl GroupSessionManager {
panic!("Session expired");
}
Ok(session.encrypt(content).await)
let content = session.encrypt(content).await;
let mut changes = Changes::default();
changes.outbound_group_sessions.push(session);
self.store.save_changes(changes).await?;
Ok(content)
}
/// Create a new outbound group session.
@ -130,8 +136,22 @@ impl GroupSessionManager {
room_id: &RoomId,
settings: EncryptionSettings,
) -> OlmResult<(OutboundGroupSession, Option<InboundGroupSession>)> {
#[allow(clippy::map_clone)]
if let Some(s) = self.outbound_group_sessions.get(room_id).map(|s| s.clone()) {
// Get the cached session, if there isn't one load one from the store
// and put it in the cache.
let outbound_session = if let Some(s) = self.outbound_group_sessions.get(room_id) {
Some(s.clone())
} else if let Some(s) = self.store.get_outbound_group_sessions(room_id).await? {
self.outbound_group_sessions
.insert(room_id.clone(), s.clone());
Some(s)
} else {
None
};
// If there is no session or the session has expired or is invalid,
// create a new one.
if let Some(s) = outbound_session {
if s.expired() || s.invalidated() {
self.create_outbound_group_session(room_id, settings)
.await
@ -294,6 +314,7 @@ impl GroupSessionManager {
.await?;
if let Some(inbound) = inbound {
changes.outbound_group_sessions.push(outbound.clone());
changes.inbound_group_sessions.push(inbound);
}
@ -303,6 +324,7 @@ impl GroupSessionManager {
let (outbound, inbound) = self
.create_outbound_group_session(room_id, encryption_settings)
.await?;
changes.outbound_group_sessions.push(outbound.clone());
changes.inbound_group_sessions.push(inbound);
debug!(

View file

@ -30,7 +30,7 @@ use super::{
};
use crate::{
identities::{ReadOnlyDevice, UserIdentities},
olm::PrivateCrossSigningIdentity,
olm::{OutboundGroupSession, PrivateCrossSigningIdentity},
};
/// An in-memory only store that will forget all the E2EE key once it's dropped.
@ -232,6 +232,13 @@ impl CryptoStore for MemoryStore {
.or_insert_with(DashSet::new)
.contains(&message_hash.hash))
}
async fn get_outbound_group_sessions(
&self,
_: &RoomId,
) -> Result<Option<OutboundGroupSession>> {
Ok(None)
}
}
#[cfg(test)]

View file

@ -86,7 +86,8 @@ use crate::{
error::SessionUnpicklingError,
identities::{Device, ReadOnlyDevice, UserDevices, UserIdentities},
olm::{
InboundGroupSession, OlmMessageHash, PrivateCrossSigningIdentity, ReadOnlyAccount, Session,
InboundGroupSession, OlmMessageHash, OutboundGroupSession, PrivateCrossSigningIdentity,
ReadOnlyAccount, Session,
},
verification::VerificationMachine,
};
@ -116,6 +117,7 @@ pub struct Changes {
pub sessions: Vec<Session>,
pub message_hashes: Vec<OlmMessageHash>,
pub inbound_group_sessions: Vec<InboundGroupSession>,
pub outbound_group_sessions: Vec<OutboundGroupSession>,
pub identities: IdentityChanges,
pub devices: DeviceChanges,
}
@ -388,6 +390,12 @@ pub trait CryptoStore: AsyncTraitDeps {
/// Get all the inbound group sessions we have stored.
async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>>;
/// Get the outobund group sessions we have stored that is used for the given room.
async fn get_outbound_group_sessions(
&self,
room_id: &RoomId,
) -> Result<Option<OutboundGroupSession>>;
/// Is the given user already tracked.
fn is_user_tracked(&self, user_id: &UserId) -> bool;

View file

@ -39,7 +39,7 @@ use super::{
};
use crate::{
identities::{ReadOnlyDevice, UserIdentities},
olm::{PickledInboundGroupSession, PrivateCrossSigningIdentity},
olm::{OutboundGroupSession, PickledInboundGroupSession, PrivateCrossSigningIdentity},
};
/// This needs to be 32 bytes long since AES-GCM requires it, otherwise we will
@ -62,6 +62,7 @@ pub struct SledStore {
olm_hashes: Tree,
sessions: Tree,
inbound_group_sessions: Tree,
outbound_group_sessions: Tree,
devices: Tree,
identities: Tree,
@ -102,6 +103,7 @@ impl SledStore {
let sessions = db.open_tree("session")?;
let inbound_group_sessions = db.open_tree("inbound_group_sessions")?;
let outbound_group_sessions = db.open_tree("outbound_group_sessions")?;
let tracked_users = db.open_tree("tracked_users")?;
let users_for_key_query = db.open_tree("users_for_key_query")?;
let olm_hashes = db.open_tree("olm_hashes")?;
@ -129,6 +131,7 @@ impl SledStore {
tracked_users_cache: DashSet::new().into(),
users_for_key_query_cache: DashSet::new().into(),
inbound_group_sessions,
outbound_group_sessions,
devices,
tracked_users,
users_for_key_query,
@ -179,6 +182,34 @@ impl SledStore {
Ok(())
}
async fn load_outbound_group_session(
&self,
room_id: &RoomId,
) -> Result<Option<OutboundGroupSession>> {
let account = self
.load_account()
.await?
.ok_or(CryptoStoreError::AccountUnset)?;
let device_id: Arc<DeviceIdBox> = account.device_id().to_owned().into();
let identity_keys = account.identity_keys;
self.outbound_group_sessions
.get(room_id.as_str())?
.map(|p| serde_json::from_slice(&p).map_err(CryptoStoreError::Serialization))
.transpose()?
.map(|p| {
OutboundGroupSession::from_pickle(
device_id,
identity_keys,
p,
self.get_pickle_mode(),
)
.map_err(CryptoStoreError::OlmGroupSession)
})
.transpose()
}
async fn save_changes(&self, changes: Changes) -> Result<()> {
let account_pickle = if let Some(a) = changes.account {
Some(a.pickle(self.get_pickle_mode()).await)
@ -218,6 +249,15 @@ impl SledStore {
inbound_session_changes.insert(key, pickle);
}
let mut outbound_session_changes = HashMap::new();
for session in changes.outbound_group_sessions {
let room_id = session.room_id();
let pickle = session.pickle(self.get_pickle_mode()).await;
outbound_session_changes.insert(room_id.clone(), pickle);
}
let identity_changes = changes.identities;
let olm_hashes = changes.message_hashes;
@ -228,6 +268,7 @@ impl SledStore {
&self.identities,
&self.sessions,
&self.inbound_group_sessions,
&self.outbound_group_sessions,
&self.olm_hashes,
)
.transaction(
@ -238,6 +279,7 @@ impl SledStore {
identities,
sessions,
inbound_sessions,
outbound_sessions,
hashes,
)| {
if let Some(a) = &account_pickle {
@ -290,6 +332,14 @@ impl SledStore {
)?;
}
for (key, session) in &outbound_session_changes {
outbound_sessions.insert(
key.as_str(),
serde_json::to_vec(&session)
.map_err(ConflictableTransactionError::Abort)?,
)?;
}
for hash in &olm_hashes {
hashes.insert(
serde_json::to_vec(&hash)
@ -503,6 +553,13 @@ impl CryptoStore for SledStore {
.olm_hashes
.contains_key(serde_json::to_vec(message_hash)?)?)
}
async fn get_outbound_group_sessions(
&self,
room_id: &RoomId,
) -> Result<Option<OutboundGroupSession>> {
self.load_outbound_group_session(room_id).await
}
}
#[cfg(test)]