base: Add media store

This commit is contained in:
Kévin Commaille 2021-05-25 21:16:28 +02:00
parent bdd51a323a
commit c318a6e847
No known key found for this signature in database
GPG key ID: 296D60AE1E61661C
6 changed files with 248 additions and 5 deletions

View file

@ -25,6 +25,7 @@ docs = ["encryption", "sled_cryptostore"]
[dependencies]
dashmap = "4.0.2"
lru = "0.6.5"
serde = { version = "1.0.122", features = ["rc"] }
serde_json = "1.0.61"
tracing = "0.1.22"

View file

@ -45,6 +45,7 @@ pub use crate::{
mod client;
mod error;
pub mod media;
mod rooms;
mod session;
mod store;

View file

@ -0,0 +1,90 @@
//! Common types for [media content](https://matrix.org/docs/spec/client_server/r0.6.1#id66).
use matrix_sdk_common::{
api::r0::media::get_content_thumbnail::Method, events::room::EncryptedFile,
identifiers::MxcUri, UInt,
};
const UNIQUE_SEPARATOR: &str = "_";
/// A trait to uniquely identify values of the same type.
pub trait UniqueKey {
/// A string that uniquely identifies `Self` compared to other values of
/// the same type.
fn unique_key(&self) -> String;
}
/// The requested format of a media file.
#[derive(Clone, Debug)]
pub enum MediaFormat {
/// The file that was uploaded.
File,
/// A thumbnail of the file that was uploaded.
Thumbnail(MediaThumbnailSize),
}
impl UniqueKey for MediaFormat {
fn unique_key(&self) -> String {
match self {
Self::File => "file".into(),
Self::Thumbnail(size) => size.unique_key(),
}
}
}
/// The requested size of a media thumbnail.
#[derive(Clone, Debug)]
pub struct MediaThumbnailSize {
/// The desired resizing method.
pub method: Method,
/// The desired width of the thumbnail. The actual thumbnail may not match
/// the size specified.
pub width: UInt,
/// The desired height of the thumbnail. The actual thumbnail may not match
/// the size specified.
pub height: UInt,
}
impl UniqueKey for MediaThumbnailSize {
fn unique_key(&self) -> String {
format!("{}{}{}x{}", self.method, UNIQUE_SEPARATOR, self.width, self.height)
}
}
/// A request for media data.
#[derive(Clone, Debug)]
pub enum MediaType {
/// A media content URI.
Uri(MxcUri),
/// An encrypted media content.
Encrypted(EncryptedFile),
}
impl UniqueKey for MediaType {
fn unique_key(&self) -> String {
match self {
Self::Uri(uri) => uri.to_string(),
Self::Encrypted(file) => file.url.to_string(),
}
}
}
/// A request for media data.
#[derive(Clone, Debug)]
pub struct MediaRequest {
/// The type of the media file.
pub media_type: MediaType,
/// The requested format of the media data.
pub format: MediaFormat,
}
impl UniqueKey for MediaRequest {
fn unique_key(&self) -> String {
format!("{}{}{}", self.media_type.unique_key(), UNIQUE_SEPARATOR, self.format.unique_key())
}
}

View file

@ -18,6 +18,7 @@ use std::{
};
use dashmap::{DashMap, DashSet};
use lru::LruCache;
use matrix_sdk_common::{
async_trait,
events::{
@ -27,15 +28,19 @@ use matrix_sdk_common::{
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, EventType,
},
identifiers::{EventId, RoomId, UserId},
identifiers::{EventId, MxcUri, RoomId, UserId},
instant::Instant,
locks::Mutex,
receipt::ReceiptType,
Raw,
};
use tracing::info;
use super::{Result, RoomInfo, StateChanges, StateStore};
use crate::deserialized_responses::{MemberEvent, StrippedMemberEvent};
use crate::{
deserialized_responses::{MemberEvent, StrippedMemberEvent},
media::{MediaRequest, UniqueKey},
};
#[derive(Debug, Clone)]
pub struct MemoryStore {
@ -62,6 +67,7 @@ pub struct MemoryStore {
#[allow(clippy::type_complexity)]
room_event_receipts:
Arc<DashMap<RoomId, DashMap<String, DashMap<EventId, DashMap<UserId, Receipt>>>>>,
media: Arc<Mutex<LruCache<String, Vec<u8>>>>,
}
impl MemoryStore {
@ -85,6 +91,7 @@ impl MemoryStore {
presence: DashMap::new().into(),
room_user_receipts: DashMap::new().into(),
room_event_receipts: DashMap::new().into(),
media: Arc::new(Mutex::new(LruCache::new(100))),
}
}
@ -386,6 +393,39 @@ impl MemoryStore {
})
.unwrap_or_else(Vec::new))
}
async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
self.media.lock().await.put(request.unique_key(), data);
Ok(())
}
async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>> {
Ok(self.media.lock().await.get(&request.unique_key()).cloned())
}
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
self.media.lock().await.pop(&request.unique_key());
Ok(())
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let mut media_store = self.media.lock().await;
let keys: Vec<String> = media_store
.iter()
.filter_map(
|(key, _)| if key.starts_with(&uri.to_string()) { Some(key.clone()) } else { None },
)
.collect();
for key in keys {
media_store.pop(&key);
}
Ok(())
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@ -501,6 +541,22 @@ impl StateStore for MemoryStore {
) -> Result<Vec<(UserId, Receipt)>> {
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
}
async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
self.add_media_content(request, data).await
}
async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>> {
self.get_media_content(request).await
}
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
self.remove_media_content(request).await
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
self.remove_media_content_for_uri(uri).await
}
}
#[cfg(test)]

View file

@ -31,7 +31,7 @@ use matrix_sdk_common::{
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, EventContent, EventType,
},
identifiers::{EventId, RoomId, UserId},
identifiers::{EventId, MxcUri, RoomId, UserId},
locks::RwLock,
receipt::ReceiptType,
AsyncTraitDeps, Raw,
@ -41,6 +41,7 @@ use sled::Db;
use crate::{
deserialized_responses::{MemberEvent, StrippedMemberEvent},
media::MediaRequest,
rooms::{RoomInfo, RoomType},
Room, Session,
};
@ -249,6 +250,37 @@ pub trait StateStore: AsyncTraitDeps {
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(UserId, Receipt)>>;
/// Add a media file's content in the media store.
///
/// # Arguments
///
/// * `request` - The `MediaRequest` of the file.
///
/// * `content` - The content of the file.
async fn add_media_content(&self, request: &MediaRequest, content: Vec<u8>) -> Result<()>;
/// Get a media file's content out of the media store.
///
/// # Arguments
///
/// * `request` - The `MediaRequest` of the file.
async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>>;
/// Removes a media file's content from the media store.
///
/// # Arguments
///
/// * `request` - The `MediaRequest` of the file.
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()>;
/// Removes all the media files' content associated to an `MxcUri` from the
/// media store.
///
/// # Arguments
///
/// * `uri` - The `MxcUri` of the media files.
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()>;
}
/// A state store wrapper for the SDK.

View file

@ -34,7 +34,7 @@ use matrix_sdk_common::{
room::member::{MemberEventContent, MembershipState},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent, EventType,
},
identifiers::{EventId, RoomId, UserId},
identifiers::{EventId, MxcUri, RoomId, UserId},
receipt::ReceiptType,
Raw,
};
@ -47,7 +47,10 @@ use tracing::info;
use self::store_key::{EncryptedEvent, StoreKey};
use super::{Result, RoomInfo, StateChanges, StateStore, StoreError};
use crate::deserialized_responses::MemberEvent;
use crate::{
deserialized_responses::MemberEvent,
media::{MediaRequest, UniqueKey},
};
#[derive(Debug, Serialize, Deserialize)]
pub enum DatabaseType {
@ -185,6 +188,7 @@ pub struct SledStore {
presence: Tree,
room_user_receipts: Tree,
room_event_receipts: Tree,
media: Tree,
}
impl std::fmt::Debug for SledStore {
@ -220,6 +224,8 @@ impl SledStore {
let room_user_receipts = db.open_tree("room_user_receipts")?;
let room_event_receipts = db.open_tree("room_event_receipts")?;
let media = db.open_tree("media")?;
Ok(Self {
path,
inner: db,
@ -240,6 +246,7 @@ impl SledStore {
stripped_room_state,
room_user_receipts,
room_event_receipts,
media,
})
}
@ -721,6 +728,46 @@ impl SledStore {
})
.collect()
}
async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
self.media.insert(
(request.media_type.unique_key().as_str(), request.format.unique_key().as_str())
.encode(),
data,
)?;
Ok(())
}
async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>> {
Ok(self
.media
.get(
(request.media_type.unique_key().as_str(), request.format.unique_key().as_str())
.encode(),
)?
.map(|m| m.to_vec()))
}
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
self.media.remove(
(request.media_type.unique_key().as_str(), request.format.unique_key().as_str())
.encode(),
)?;
Ok(())
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let keys = self.media.scan_prefix(uri.as_str().encode()).keys();
let mut batch = sled::Batch::default();
for key in keys {
batch.remove(key?);
}
Ok(self.media.apply_batch(batch)?)
}
}
#[async_trait]
@ -830,6 +877,22 @@ impl StateStore for SledStore {
) -> Result<Vec<(UserId, Receipt)>> {
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
}
async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
self.add_media_content(request, data).await
}
async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>> {
self.get_media_content(request).await
}
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
self.remove_media_content(request).await
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
self.remove_media_content_for_uri(uri).await
}
}
#[cfg(test)]