diff --git a/src/client_server.rs b/src/client_server.rs index b66993a..8627dcb 100644 --- a/src/client_server.rs +++ b/src/client_server.rs @@ -4,7 +4,7 @@ use std::{ time::{Duration, SystemTime}, }; -use log::debug; +use log::{debug, warn}; use rocket::{get, options, post, put, State}; use ruma_client_api::{ error::{Error, ErrorKind}, @@ -15,7 +15,7 @@ use ruma_client_api::{ config::{get_global_account_data, set_global_account_data}, directory::{self, get_public_rooms_filtered}, filter::{self, create_filter, get_filter}, - keys::{get_keys, upload_keys}, + keys::{claim_keys, get_keys, upload_keys}, media::get_media_config, membership::{ forget_room, get_member_events, invite_user, join_room_by_id, join_room_by_id_or_alias, @@ -33,7 +33,7 @@ use ruma_client_api::{ state::{create_state_event_for_empty_key, create_state_event_for_key}, sync::sync_events, thirdparty::get_protocols, - to_device::send_event_to_device, + to_device::{self, send_event_to_device}, typing::create_typing_event, uiaa::{AuthFlow, UiaaInfo, UiaaResponse}, user_directory::search_users, @@ -176,7 +176,8 @@ pub fn register_route( .update( None, &user_id, - EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent { + &EventType::PushRules, + serde_json::to_value(ruma_events::push_rules::PushRulesEvent { content: ruma_events::push_rules::PushRulesEventContent { global: ruma_events::push_rules::Ruleset { content: vec![], @@ -202,7 +203,8 @@ pub fn register_route( }], }, }, - }), + }) + .unwrap(), &db.globals, ) .unwrap(); @@ -353,7 +355,8 @@ pub fn set_pushrule_route( .update( None, &user_id, - EduEvent::PushRules(ruma_events::push_rules::PushRulesEvent { + &EventType::PushRules, + serde_json::to_value(ruma_events::push_rules::PushRulesEvent { content: ruma_events::push_rules::PushRulesEventContent { global: ruma_events::push_rules::Ruleset { content: vec![], @@ -379,7 +382,8 @@ pub fn set_pushrule_route( }], }, }, - }), + }) + .unwrap(), &db.globals, ) .unwrap(); @@ -422,25 +426,56 @@ pub fn create_filter_route(_user_id: String) -> MatrixResult/account_data/<_type>")] +#[put( + "/_matrix/client/r0/user/<_user_id>/account_data/<_type>", + data = "" +)] pub fn set_global_account_data_route( + db: State<'_, Database>, + body: Ruma, _user_id: String, _type: String, ) -> MatrixResult { + let user_id = body.user_id.as_ref().expect("user is authenticated"); + + db.account_data + .update( + None, + user_id, + &EventType::try_from(&body.event_type).unwrap(), + serde_json::from_str(body.data.get()).unwrap(), + &db.globals, + ) + .unwrap(); + MatrixResult(Ok(set_global_account_data::Response)) } -#[get("/_matrix/client/r0/user/<_user_id>/account_data/<_type>")] +#[get( + "/_matrix/client/r0/user/<_user_id>/account_data/<_type>", + data = "" +)] pub fn get_global_account_data_route( + db: State<'_, Database>, + body: Ruma, _user_id: String, _type: String, ) -> MatrixResult { - // TODO - MatrixResult(Err(Error { - kind: ErrorKind::NotFound, - message: "Data not found.".to_owned(), - status_code: http::StatusCode::NOT_FOUND, - })) + let user_id = body.user_id.as_ref().expect("user is authenticated"); + + if let Some(data) = db + .account_data + .get(None, user_id, &EventType::try_from(&body.event_type).unwrap()) + .unwrap() + { + MatrixResult(Ok(get_global_account_data::Response { account_data: data })) + } else { + MatrixResult(Err(Error { + kind: ErrorKind::NotFound, + message: "Data not found.".to_owned(), + status_code: http::StatusCode::NOT_FOUND, + })) + } } #[put("/_matrix/client/r0/profile/<_user_id>/displayname", data = "")] @@ -648,20 +683,93 @@ pub fn set_presence_route( MatrixResult(Ok(set_presence::Response)) } -#[post("/_matrix/client/r0/keys/query")] -pub fn get_keys_route() -> MatrixResult { - // TODO - MatrixResult(Ok(get_keys::Response { - failures: BTreeMap::new(), - device_keys: BTreeMap::new(), +#[post("/_matrix/client/r0/keys/upload", data = "")] +pub fn upload_keys_route( + db: State<'_, Database>, + body: Ruma, +) -> MatrixResult { + let user_id = body.user_id.as_ref().expect("user is authenticated"); + let device_id = body.device_id.as_ref().expect("user is authenticated"); + + if let Some(one_time_keys) = &body.one_time_keys { + for (key_key, key_value) in one_time_keys { + db.users + .add_one_time_key(user_id, device_id, key_key, key_value) + .unwrap(); + } + } + + if let Some(device_keys) = &body.device_keys { + db.users + .add_device_keys(user_id, device_id, device_keys) + .unwrap(); + } + + MatrixResult(Ok(upload_keys::Response { + one_time_key_counts: db.users.count_one_time_keys(user_id, device_id).unwrap(), })) } -#[post("/_matrix/client/r0/keys/upload")] -pub fn upload_keys_route() -> MatrixResult { - // TODO - MatrixResult(Ok(upload_keys::Response { - one_time_key_counts: BTreeMap::new(), +#[post("/_matrix/client/r0/keys/query", data = "")] +pub fn get_keys_route( + db: State<'_, Database>, + body: Ruma, +) -> MatrixResult { + let mut device_keys = BTreeMap::new(); + + for (user_id, device_ids) in &body.device_keys { + if device_ids.is_empty() { + let mut container = BTreeMap::new(); + for (device_id, keys) in db + .users + .all_device_keys(&user_id.clone()) + .map(|r| r.unwrap()) + { + container.insert(device_id, keys); + } + device_keys.insert(user_id.clone(), container); + } else { + for device_id in device_ids { + let mut container = BTreeMap::new(); + for keys in db.users.get_device_keys(&user_id.clone(), &device_id) { + container.insert(device_id.clone(), keys.unwrap()); + } + device_keys.insert(user_id.clone(), container); + } + } + } + + MatrixResult(Ok(get_keys::Response { + failures: BTreeMap::new(), + device_keys, + })) +} + +#[post("/_matrix/client/r0/keys/claim", data = "")] +pub fn claim_keys_route( + db: State<'_, Database>, + body: Ruma, +) -> MatrixResult { + let mut one_time_keys = BTreeMap::new(); + for (user_id, map) in &body.one_time_keys { + let mut container = BTreeMap::new(); + for (device_id, key_algorithm) in map { + if let Some(one_time_keys) = db + .users + .take_one_time_key(user_id, device_id, key_algorithm) + .unwrap() + { + let mut c = BTreeMap::new(); + c.insert(one_time_keys.0, one_time_keys.1); + container.insert(device_id.clone(), c); + } + } + one_time_keys.insert(user_id.clone(), container); + } + + MatrixResult(Ok(claim_keys::Response { + failures: BTreeMap::new(), + one_time_keys, })) } @@ -672,16 +780,19 @@ pub fn set_read_marker_route( _room_id: String, ) -> MatrixResult { let user_id = body.user_id.as_ref().expect("user is authenticated"); + db.account_data .update( Some(&body.room_id), &user_id, - EduEvent::FullyRead(ruma_events::fully_read::FullyReadEvent { + &EventType::FullyRead, + serde_json::to_value(ruma_events::fully_read::FullyReadEvent { content: ruma_events::fully_read::FullyReadEventContent { event_id: body.fully_read.clone(), }, room_id: Some(body.room_id.clone()), - }), + }) + .unwrap(), &db.globals, ) .unwrap(); @@ -745,7 +856,7 @@ pub fn create_typing_event_route( content: ruma_events::typing::TypingEventContent { user_ids: vec![user_id.clone()], }, - room_id: None, // None because it can be inferred + room_id: Some(body.room_id.clone()), // TODO: Can be None because it can be inferred }); if body.typing { @@ -860,7 +971,7 @@ pub fn get_alias_route( body: Ruma, _room_alias: String, ) -> MatrixResult { - // TODO + warn!("TODO: get_alias_route"); let room_id = if body.room_alias.server_name() == db.globals.server_name() { match body.room_alias.alias() { "conduit" => "!lgOCCXQKtXOAPlAlG5:conduit.rs", @@ -1092,13 +1203,13 @@ pub fn search_users_route( #[get("/_matrix/client/r0/rooms/<_room_id>/members")] pub fn get_member_events_route(_room_id: String) -> MatrixResult { - // TODO + warn!("TODO: get_member_events_route"); MatrixResult(Ok(get_member_events::Response { chunk: Vec::new() })) } #[get("/_matrix/client/r0/thirdparty/protocols")] pub fn get_protocols_route() -> MatrixResult { - // TODO + warn!("TODO: get_protocols_route"); MatrixResult(Ok(get_protocols::Response { protocols: BTreeMap::new(), })) @@ -1208,6 +1319,8 @@ pub fn sync_route( ) -> MatrixResult { std::thread::sleep(Duration::from_millis(1500)); let user_id = body.user_id.as_ref().expect("user is authenticated"); + let device_id = body.device_id.as_ref().expect("user is authenticated"); + let next_batch = db.globals.current_count().unwrap().to_string(); let mut joined_rooms = BTreeMap::new(); @@ -1277,7 +1390,7 @@ pub fn sync_route( content: ruma_events::typing::TypingEventContent { user_ids: Vec::new(), }, - room_id: None, // None because it can be inferred + room_id: Some(room_id.clone()), // None because it can be inferred }) .into(), ); @@ -1403,10 +1516,22 @@ pub fn sync_route( .global_edus .globallatests_since(since) .unwrap() - .map(|edu| { - EventJson::::from( - edu.unwrap().json().to_owned(), + .filter_map(|edu| { + // Only look for presence events + if let Ok(mut edu) = EventJson::::from( + edu.unwrap().into_json(), ) + .deserialize() + { + let timestamp = edu.content.last_active_ago.unwrap(); + edu.content.last_active_ago = Some( + js_int::UInt::try_from(utils::millis_since_unix_epoch()).unwrap() + - timestamp + ); + Some(edu.into()) + } else { + None + } }) .collect(), }, @@ -1421,7 +1546,12 @@ pub fn sync_route( }, device_lists: Default::default(), device_one_time_keys_count: Default::default(), - to_device: sync_events::ToDevice { events: Vec::new() }, + to_device: sync_events::ToDevice { + events: db + .users + .take_to_device_events(user_id, device_id, 100) + .unwrap(), + }, })) } @@ -1468,7 +1598,7 @@ pub fn get_message_events_route( #[get("/_matrix/client/r0/voip/turnServer")] pub fn turn_server_route() -> MatrixResult { - // TODO + warn!("TODO: turn_server_route"); MatrixResult(Err(Error { kind: ErrorKind::NotFound, message: "There is no turn server yet.".to_owned(), @@ -1478,7 +1608,7 @@ pub fn turn_server_route() -> MatrixResult { #[post("/_matrix/client/r0/publicised_groups")] pub fn publicised_groups_route() -> MatrixResult { - // TODO + warn!("TODO: publicised_groups_route"); MatrixResult(Err(Error { kind: ErrorKind::NotFound, message: "There are no publicised groups yet.".to_owned(), @@ -1486,18 +1616,48 @@ pub fn publicised_groups_route() -> MatrixResult })) } -#[put("/_matrix/client/r0/sendToDevice/<_event_type>/<_txn_id>")] +#[put( + "/_matrix/client/r0/sendToDevice/<_event_type>/<_txn_id>", + data = "" +)] pub fn send_event_to_device_route( + db: State<'_, Database>, + body: Ruma, _event_type: String, _txn_id: String, ) -> MatrixResult { - // TODO + let user_id = body.user_id.as_ref().expect("user is authenticated"); + + for (target_user_id, map) in &body.messages { + for (target_device_id_maybe, event) in map { + match target_device_id_maybe { + to_device::DeviceIdOrAllDevices::DeviceId(target_device_id) => db + .users + .add_to_device_event( + user_id, + &target_user_id, + &target_device_id, + &body.event_type, + serde_json::from_str(event.get()).unwrap(), + &db.globals, + ) + .unwrap(), + + to_device::DeviceIdOrAllDevices::AllDevices => { + for target_device_id in db.users.all_device_ids(&target_user_id) { + target_device_id.unwrap(); + } + } + } + } + } + MatrixResult(Ok(send_event_to_device::Response)) } #[get("/_matrix/media/r0/config")] pub fn get_media_config_route() -> MatrixResult { - // TODO + warn!("TODO: get_media_config_route"); MatrixResult(Ok(get_media_config::Response { upload_size: 0_u32.into(), })) @@ -1509,7 +1669,7 @@ pub fn options_route( ) -> MatrixResult { MatrixResult(Err(Error { kind: ErrorKind::NotFound, - message: "This is the options route.".to_owned(), + message: "".to_owned(), status_code: http::StatusCode::OK, })) } diff --git a/src/database.rs b/src/database.rs index 0bd3aa0..4cdad87 100644 --- a/src/database.rs +++ b/src/database.rs @@ -59,6 +59,9 @@ impl Database { userid_avatarurl: db.open_tree("userid_avatarurl").unwrap(), userdeviceid_token: db.open_tree("userdeviceid_token").unwrap(), token_userdeviceid: db.open_tree("token_userdeviceid").unwrap(), + onetimekeyid_onetimekeys: db.open_tree("onetimekeyid_onetimekeys").unwrap(), + userdeviceid_devicekeys: db.open_tree("userdeviceid_devicekeys").unwrap(), + todeviceid_events: db.open_tree("todeviceid_events").unwrap(), }, rooms: rooms::Rooms { edus: rooms::RoomEdus { diff --git a/src/database/account_data.rs b/src/database/account_data.rs index 7ade70c..659d3e5 100644 --- a/src/database/account_data.rs +++ b/src/database/account_data.rs @@ -1,7 +1,7 @@ -use crate::Result; -use ruma_events::{collections::only::Event as EduEvent, EventJson}; +use crate::{utils, Error, Result}; +use ruma_events::{collections::only::Event as EduEvent, EventJson, EventType}; use ruma_identifiers::{RoomId, UserId}; -use std::collections::HashMap; +use std::{collections::HashMap, convert::TryFrom}; pub struct AccountData { pub(super) roomuserdataid_accountdata: sled::Tree, // RoomUserDataId = Room + User + Count + Type @@ -13,7 +13,8 @@ impl AccountData { &self, room_id: Option<&RoomId>, user_id: &UserId, - event: EduEvent, + kind: &EventType, + data: serde_json::Value, globals: &super::globals::Globals, ) -> Result<()> { let mut prefix = room_id @@ -48,11 +49,10 @@ impl AccountData { let mut key = prefix; key.extend_from_slice(&globals.next_count()?.to_be_bytes()); key.push(0xff); - let json = serde_json::to_value(&event)?; - key.extend_from_slice(json["type"].as_str().unwrap().as_bytes()); + key.extend_from_slice(kind.to_string().as_bytes()); self.roomuserdataid_accountdata - .insert(key, &*json.to_string()) + .insert(key, &*data.to_string()) .unwrap(); Ok(()) @@ -64,7 +64,7 @@ impl AccountData { &self, room_id: Option<&RoomId>, user_id: &UserId, - kind: &str, + kind: &EventType, ) -> Result>> { Ok(self.all(room_id, user_id)?.remove(kind)) } @@ -75,7 +75,7 @@ impl AccountData { room_id: Option<&RoomId>, user_id: &UserId, since: u64, - ) -> Result>> { + ) -> Result>> { let mut userdata = HashMap::new(); let mut prefix = room_id @@ -91,17 +91,30 @@ impl AccountData { let mut first_possible = prefix.clone(); first_possible.extend_from_slice(&(since + 1).to_be_bytes()); - for json in self + for r in self .roomuserdataid_accountdata .range(&*first_possible..) .filter_map(|r| r.ok()) .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(|(_, v)| serde_json::from_slice::(&v).unwrap()) + .map(|(k, v)| { + Ok::<_, Error>(( + EventType::try_from(utils::string_from_bytes( + k.rsplit(|&b| b == 0xff) + .next() + .ok_or(Error::BadDatabase("roomuserdataid is invalid"))?, + )?) + .map_err(|_| Error::BadDatabase("roomuserdataid is invalid"))?, + serde_json::from_slice::(&v).unwrap(), + )) + }) { + let (kind, content) = r.unwrap(); + let mut json = serde_json::Map::new(); + json.insert("content".to_owned(), content); + json.insert("type".to_owned(), kind.to_string().into()); userdata.insert( - json["type"].as_str().unwrap().to_owned(), - serde_json::from_value::>(json) - .expect("userdata in db is valid"), + kind, + serde_json::from_value::>(json.into())?, ); } @@ -113,7 +126,7 @@ impl AccountData { &self, room_id: Option<&RoomId>, user_id: &UserId, - ) -> Result>> { + ) -> Result>> { self.changes_since(room_id, user_id, 0) } } diff --git a/src/database/global_edus.rs b/src/database/global_edus.rs index db44674..f665260 100644 --- a/src/database/global_edus.rs +++ b/src/database/global_edus.rs @@ -3,8 +3,8 @@ use ruma_events::{collections::only::Event as EduEvent, EventJson}; use ruma_identifiers::UserId; pub struct GlobalEdus { + //pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count pub(super) globallatestid_globallatest: sled::Tree, // Presence, GlobalLatestId = Count + UserId - //pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count } impl GlobalEdus { diff --git a/src/database/users.rs b/src/database/users.rs index 5a8b0aa..ced5fe1 100644 --- a/src/database/users.rs +++ b/src/database/users.rs @@ -1,6 +1,9 @@ use crate::{utils, Error, Result}; -use ruma_identifiers::UserId; -use std::convert::TryFrom; +use js_int::UInt; +use ruma_client_api::r0::keys::{AlgorithmAndDeviceId, DeviceKeys, KeyAlgorithm, OneTimeKey}; +use ruma_events::{to_device::AnyToDeviceEvent, EventJson, EventType}; +use ruma_identifiers::{DeviceId, UserId}; +use std::{collections::BTreeMap, convert::TryFrom}; pub struct Users { pub(super) userid_password: sled::Tree, @@ -9,6 +12,11 @@ pub struct Users { pub(super) userdeviceids: sled::Tree, pub(super) userdeviceid_token: sled::Tree, pub(super) token_userdeviceid: sled::Tree, + + pub(super) onetimekeyid_onetimekeys: sled::Tree, // OneTimeKeyId = UserId + AlgorithmAndDeviceId + pub(super) userdeviceid_devicekeys: sled::Tree, + + pub(super) todeviceid_events: sled::Tree, // ToDeviceId = UserId + DeviceId + Count } impl Users { @@ -96,7 +104,7 @@ impl Users { } /// Adds a new device to a user. - pub fn create_device(&self, user_id: &UserId, device_id: &str, token: &str) -> Result<()> { + pub fn create_device(&self, user_id: &UserId, device_id: &DeviceId, token: &str) -> Result<()> { if !self.exists(user_id)? { return Err(Error::BadRequest( "tried to create device for nonexistent user", @@ -114,8 +122,22 @@ impl Users { Ok(()) } + /// Returns an iterator over all device ids of this user. + pub fn all_device_ids(&self, user_id: &UserId) -> impl Iterator> { + let mut prefix = user_id.to_string().as_bytes().to_vec(); + prefix.push(0xff); + self.userdeviceids.scan_prefix(prefix).keys().map(|bytes| { + Ok(utils::string_from_bytes( + &*bytes? + .rsplit(|&b| b == 0xff) + .next() + .ok_or(Error::BadDatabase("userdeviceid is invalid"))?, + )?) + }) + } + /// Replaces the access token of one device. - pub fn set_token(&self, user_id: &UserId, device_id: &str, token: &str) -> Result<()> { + pub fn set_token(&self, user_id: &UserId, device_id: &DeviceId, token: &str) -> Result<()> { let mut userdeviceid = user_id.to_string().as_bytes().to_vec(); userdeviceid.push(0xff); userdeviceid.extend_from_slice(device_id.as_bytes()); @@ -138,4 +160,196 @@ impl Users { Ok(()) } + + pub fn add_one_time_key( + &self, + user_id: &UserId, + device_id: &DeviceId, + one_time_key_key: &AlgorithmAndDeviceId, + one_time_key_value: &OneTimeKey, + ) -> Result<()> { + let mut key = user_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(device_id.as_bytes()); + + if self.userdeviceids.get(&key)?.is_none() { + return Err(Error::BadRequest( + "Tried to set token for nonexistent device", + )); + } + + key.push(0xff); + // TODO: Use AlgorithmAndDeviceId::to_string when it's available (and update everything, + // because there are no wrapping quotation marks anymore) + key.extend_from_slice(&serde_json::to_string(one_time_key_key)?.as_bytes()); + + self.onetimekeyid_onetimekeys + .insert(&key, &*serde_json::to_string(&one_time_key_value)?)?; + + Ok(()) + } + + pub fn take_one_time_key( + &self, + user_id: &UserId, + device_id: &DeviceId, + key_algorithm: &KeyAlgorithm, + ) -> Result> { + let mut prefix = user_id.to_string().as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(device_id.as_bytes()); + prefix.push(0xff); + prefix.push(b'"'); // Annoying quotation mark + prefix.extend_from_slice(key_algorithm.to_string().as_bytes()); + prefix.push(b':'); + + self.onetimekeyid_onetimekeys + .scan_prefix(&prefix) + .next() + .map(|r| { + let (key, value) = r?; + Ok(( + serde_json::from_slice( + &*key + .rsplit(|&b| b == 0xff) + .next() + .ok_or(Error::BadDatabase("onetimekeyid is invalid"))?, + )?, + serde_json::from_slice(&*value)?, + )) + }) + .transpose() + } + + pub fn count_one_time_keys( + &self, + user_id: &UserId, + device_id: &DeviceId, + ) -> Result> { + let mut userdeviceid = user_id.to_string().as_bytes().to_vec(); + userdeviceid.push(0xff); + userdeviceid.extend_from_slice(device_id.as_bytes()); + + let mut counts = BTreeMap::new(); + + for algorithm in self + .onetimekeyid_onetimekeys + .scan_prefix(&userdeviceid) + .keys() + .map(|bytes| { + Ok::<_, Error>( + serde_json::from_slice::( + &*bytes? + .rsplit(|&b| b == 0xff) + .next() + .ok_or(Error::BadDatabase("onetimekeyid is invalid"))?, + )? + .0, + ) + }) + { + *counts.entry(algorithm?).or_default() += UInt::from(1_u32); + } + + Ok(counts) + } + + pub fn add_device_keys( + &self, + user_id: &UserId, + device_id: &DeviceId, + device_keys: &DeviceKeys, + ) -> Result<()> { + let mut userdeviceid = user_id.to_string().as_bytes().to_vec(); + userdeviceid.push(0xff); + userdeviceid.extend_from_slice(device_id.as_bytes()); + + self.userdeviceid_devicekeys + .insert(&userdeviceid, &*serde_json::to_string(&device_keys)?)?; + + Ok(()) + } + + pub fn get_device_keys( + &self, + user_id: &UserId, + device_id: &DeviceId, + ) -> impl Iterator> { + let mut key = user_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(device_id.as_bytes()); + + self.userdeviceid_devicekeys + .scan_prefix(key) + .values() + .map(|bytes| Ok(serde_json::from_slice(&bytes?)?)) + } + + pub fn all_device_keys( + &self, + user_id: &UserId, + ) -> impl Iterator> { + let mut key = user_id.to_string().as_bytes().to_vec(); + key.push(0xff); + + self.userdeviceid_devicekeys.scan_prefix(key).map(|r| { + let (key, value) = r?; + Ok(( + utils::string_from_bytes( + key.rsplit(|&b| b == 0xff) + .next() + .ok_or(Error::BadDatabase("userdeviceid is invalid"))?, + )?, + serde_json::from_slice(&*value)?, + )) + }) + } + + pub fn add_to_device_event( + &self, + sender: &UserId, + target_user_id: &UserId, + target_device_id: &DeviceId, + event_type: &EventType, + content: serde_json::Value, + globals: &super::globals::Globals, + ) -> Result<()> { + let mut key = target_user_id.to_string().as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(target_device_id.as_bytes()); + key.push(0xff); + key.extend_from_slice(&globals.next_count()?.to_be_bytes()); + + let mut json = serde_json::Map::new(); + json.insert("type".to_owned(), event_type.to_string().into()); + json.insert("sender".to_owned(), sender.to_string().into()); + json.insert("content".to_owned(), content); + + self.todeviceid_events + .insert(&key, &*serde_json::to_string(&json)?)?; + + Ok(()) + } + + pub fn take_to_device_events( + &self, + user_id: &UserId, + device_id: &DeviceId, + max: usize, + ) -> Result>> { + let mut events = Vec::new(); + + let mut prefix = user_id.to_string().as_bytes().to_vec(); + prefix.push(0xff); + prefix.extend_from_slice(device_id.as_bytes()); + prefix.push(0xff); + + for result in self.todeviceid_events.scan_prefix(&prefix).take(max) { + let (key, value) = result?; + events.push(serde_json::from_slice(&*value)?); + self.todeviceid_events.remove(key)?; + } + + Ok(events) + } } diff --git a/src/main.rs b/src/main.rs index 514a8dd..717ef96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,8 +43,9 @@ fn setup_rocket() -> rocket::Rocket { client_server::get_avatar_url_route, client_server::get_profile_route, client_server::set_presence_route, - client_server::get_keys_route, client_server::upload_keys_route, + client_server::get_keys_route, + client_server::claim_keys_route, client_server::set_read_marker_route, client_server::create_typing_event_route, client_server::create_room_route, diff --git a/src/server_server.rs b/src/server_server.rs index 2fcbe98..84ca5cc 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -166,7 +166,12 @@ pub fn get_server_keys(db: State<'_, Database>) -> Json { .body(), ) .unwrap(); - ruma_signatures::sign_json(db.globals.server_name(), db.globals.keypair(), &mut response).unwrap(); + ruma_signatures::sign_json( + db.globals.server_name(), + db.globals.keypair(), + &mut response, + ) + .unwrap(); Json(response.to_string()) }