From 24793891e052cfe66bde1b1fd65d75584c7c0949 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Fri, 14 May 2021 11:03:18 +0200 Subject: [PATCH 1/7] feat: implement GET /presence --- Cargo.toml | 4 +-- src/client_server/presence.rs | 51 ++++++++++++++++++++++++++++++++--- src/database/rooms/edus.rs | 41 ++++++++++++++++++++++++++++ src/main.rs | 1 + 4 files changed, 92 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 78496e4..950924a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,5 +108,5 @@ maintainer-scripts = "debian/" systemd-units = { unit-name = "matrix-conduit" } # For flamegraphs: -#[profile.release] -#debug = true +[profile.release] +debug = true diff --git a/src/client_server/presence.rs b/src/client_server/presence.rs index 175853f..9f4f7a3 100644 --- a/src/client_server/presence.rs +++ b/src/client_server/presence.rs @@ -1,10 +1,10 @@ use super::State; use crate::{utils, ConduitResult, Database, Ruma}; -use ruma::api::client::r0::presence::set_presence; -use std::convert::TryInto; +use ruma::api::client::r0::presence::{get_presence, set_presence}; +use std::{convert::TryInto, time::Duration}; #[cfg(feature = "conduit_bin")] -use rocket::put; +use rocket::{get, put}; #[cfg_attr( feature = "conduit_bin", @@ -46,3 +46,48 @@ pub async fn set_presence_route( Ok(set_presence::Response.into()) } + +#[cfg_attr( + feature = "conduit_bin", + get("/_matrix/client/r0/presence/<_>/status", data = "") +)] +#[tracing::instrument(skip(db, body))] +pub async fn get_presence_route( + db: State<'_, Database>, + body: Ruma>, +) -> ConduitResult { + let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + + let mut presence_event = None; + + for room_id in db + .rooms + .get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()]) + { + let room_id = room_id?; + + if let Some(presence) = db + .rooms + .edus + .get_last_presence_event(&sender_user, &room_id)? + { + presence_event = Some(presence); + } + } + + if let Some(presence) = presence_event { + Ok(get_presence::Response { + // TODO: Should ruma just use the presenceeventcontent type here? + status_msg: presence.content.status_msg, + currently_active: presence.content.currently_active, + last_active_ago: presence + .content + .last_active_ago + .map(|millis| Duration::from_millis(millis.into())), + presence: presence.content.presence, + } + .into()) + } else { + todo!(); + } +} diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index 56000e0..3bf2e06 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -367,6 +367,47 @@ impl RoomEdus { .transpose() } + pub fn get_last_presence_event( + &self, + user_id: &UserId, + room_id: &RoomId, + ) -> Result> { + let last_update = match self.last_presence_update(user_id)? { + Some(last) => last, + None => return Ok(None), + }; + + let mut presence_id = room_id.as_bytes().to_vec(); + presence_id.push(0xff); + presence_id.extend_from_slice(&last_update.to_be_bytes()); + presence_id.push(0xff); + presence_id.extend_from_slice(&user_id.as_bytes()); + + self.presenceid_presence + .get(presence_id)? + .map(|value| { + let mut presence = serde_json::from_slice::(&value) + .map_err(|_| Error::bad_database("Invalid presence event in db."))?; + let current_timestamp: UInt = utils::millis_since_unix_epoch() + .try_into() + .expect("time is valid"); + + if presence.content.presence == PresenceState::Online { + // Don't set last_active_ago when the user is online + presence.content.last_active_ago = None; + } else { + // Convert from timestamp to duration + presence.content.last_active_ago = presence + .content + .last_active_ago + .map(|timestamp| current_timestamp - timestamp); + } + + Ok(presence) + }) + .transpose() + } + /// Sets all users to offline who have been quiet for too long. pub fn presence_maintain( &self, diff --git a/src/main.rs b/src/main.rs index 5005a37..57eb0d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,6 +69,7 @@ fn setup_rocket(config: Figment, data: Database) -> rocket::Rocket Date: Mon, 17 May 2021 10:25:27 +0200 Subject: [PATCH 2/7] feat: send read receipts over federation currently they will only be sent if a PDU has to be sent as well --- src/client_server/sync.rs | 1 + src/database.rs | 29 +++++++++- src/database/globals.rs | 12 +++++ src/database/rooms.rs | 28 ++++++++++ src/database/rooms/edus.rs | 29 ++++++++-- src/database/sending.rs | 108 +++++++++++++++++++++++++++++++++++-- 6 files changed, 197 insertions(+), 10 deletions(-) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 66a1e13..fe6f692 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -406,6 +406,7 @@ pub async fn sync_events_route( .edus .readreceipts_since(&room_id, since)? .filter_map(|r| r.ok()) // Filter out buggy events + .map(|(_, _, v)| v) .collect::>(); if db.rooms.edus.last_typing_update(&room_id, &db.globals)? > since { diff --git a/src/database.rs b/src/database.rs index 62b3a40..6b68b9e 100644 --- a/src/database.rs +++ b/src/database.rs @@ -14,7 +14,7 @@ pub mod users; use crate::{Error, Result}; use directories::ProjectDirs; use futures::StreamExt; -use log::info; +use log::{error, info}; use rocket::futures::{self, channel::mpsc}; use ruma::{DeviceId, ServerName, UserId}; use serde::Deserialize; @@ -160,6 +160,7 @@ impl Database { tokenids: db.open_tree("tokenids")?, roomserverids: db.open_tree("roomserverids")?, + serverroomids: db.open_tree("serverroomids")?, userroomid_joined: db.open_tree("userroomid_joined")?, roomuserid_joined: db.open_tree("roomuserid_joined")?, roomuseroncejoinedids: db.open_tree("roomuseroncejoinedids")?, @@ -197,6 +198,7 @@ impl Database { userdevicetxnid_response: db.open_tree("userdevicetxnid_response")?, }, sending: sending::Sending { + servername_educount: db.open_tree("servername_educount")?, servernamepduids: db.open_tree("servernamepduids")?, servercurrentevents: db.open_tree("servercurrentevents")?, maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), @@ -217,6 +219,31 @@ impl Database { _db: db, }; + // MIGRATIONS + if db.globals.database_version()? < 1 { + for roomserverid in db.rooms.roomserverids.iter().keys() { + let roomserverid = roomserverid?; + let mut parts = roomserverid.split(|&b| b == 0xff); + let room_id = parts.next().expect("split always returns one element"); + let servername = match parts.next() { + Some(s) => s, + None => { + error!("Migration: Invalid roomserverid in db."); + continue; + } + }; + let mut serverroomid = servername.to_vec(); + serverroomid.push(0xff); + serverroomid.extend_from_slice(room_id); + + db.rooms.serverroomids.insert(serverroomid, &[])?; + } + + db.globals.bump_database_version(1)?; + + info!("Migration: 0 -> 1 finished"); + } + // This data is probably outdated db.rooms.edus.presenceid_presence.clear()?; diff --git a/src/database/globals.rs b/src/database/globals.rs index 04f8d29..c1eafe0 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -258,4 +258,16 @@ impl Globals { } Ok(response) } + + pub fn database_version(&self) -> Result { + self.globals.get("version")?.map_or(Ok(0), |version| { + utils::u64_from_bytes(&version) + .map_err(|_| Error::bad_database("Database version id is invalid.")) + }) + } + + pub fn bump_database_version(&self, new_version: u64) -> Result<()> { + self.globals.insert("version", &new_version.to_be_bytes())?; + Ok(()) + } } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index c359997..48e6e11 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -50,6 +50,8 @@ pub struct Rooms { /// Participating servers in a room. pub(super) roomserverids: sled::Tree, // RoomServerId = RoomId + ServerName + pub(super) serverroomids: sled::Tree, // ServerRoomId = ServerName + RoomId + pub(super) userroomid_joined: sled::Tree, pub(super) roomuserid_joined: sled::Tree, pub(super) roomuseroncejoinedids: sled::Tree, @@ -1597,6 +1599,10 @@ impl Rooms { roomserver_id.push(0xff); roomserver_id.extend_from_slice(user_id.server_name().as_bytes()); + let mut serverroom_id = user_id.server_name().as_bytes().to_vec(); + serverroom_id.push(0xff); + serverroom_id.extend_from_slice(room_id.as_bytes()); + let mut userroom_id = user_id.as_bytes().to_vec(); userroom_id.push(0xff); userroom_id.extend_from_slice(room_id.as_bytes()); @@ -1700,6 +1706,7 @@ impl Rooms { } self.roomserverids.insert(&roomserver_id, &[])?; + self.serverroomids.insert(&serverroom_id, &[])?; self.userroomid_joined.insert(&userroom_id, &[])?; self.roomuserid_joined.insert(&roomuser_id, &[])?; self.userroomid_invitestate.remove(&userroom_id)?; @@ -1725,6 +1732,7 @@ impl Rooms { } self.roomserverids.insert(&roomserver_id, &[])?; + self.serverroomids.insert(&serverroom_id, &[])?; self.userroomid_invitestate.insert( &userroom_id, serde_json::to_vec(&last_state.unwrap_or_default()) @@ -1745,6 +1753,7 @@ impl Rooms { .all(|u| u.server_name() != user_id.server_name()) { self.roomserverids.remove(&roomserver_id)?; + self.serverroomids.remove(&serverroom_id)?; } self.userroomid_leftstate.insert( &userroom_id, @@ -2152,6 +2161,25 @@ impl Rooms { }) } + /// Returns an iterator of all rooms a server participates in (as far as we know). + pub fn server_rooms(&self, server: &ServerName) -> impl Iterator> { + let mut prefix = server.as_bytes().to_vec(); + prefix.push(0xff); + + self.serverroomids.scan_prefix(prefix).keys().map(|key| { + Ok(RoomId::try_from( + utils::string_from_bytes( + &key? + .rsplit(|&b| b == 0xff) + .next() + .expect("rsplit always returns an element"), + ) + .map_err(|_| Error::bad_database("RoomId in serverroomids is invalid unicode."))?, + ) + .map_err(|_| Error::bad_database("RoomId in serverroomids is invalid."))?) + }) + } + /// Returns an iterator over all joined members of a room. #[tracing::instrument(skip(self))] pub fn room_members(&self, room_id: &RoomId) -> impl Iterator> { diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index 3bf2e06..89f2905 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -76,9 +76,12 @@ impl RoomEdus { &self, room_id: &RoomId, since: u64, - ) -> Result>>> { + ) -> Result< + impl Iterator)>>, + > { let mut prefix = room_id.as_bytes().to_vec(); prefix.push(0xff); + let prefix2 = prefix.clone(); let mut first_possible_edu = prefix.clone(); first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since @@ -87,14 +90,30 @@ impl RoomEdus { .readreceiptid_readreceipt .range(&*first_possible_edu..) .filter_map(|r| r.ok()) - .take_while(move |(k, _)| k.starts_with(&prefix)) - .map(|(_, v)| { + .take_while(move |(k, _)| k.starts_with(&prefix2)) + .map(move |(k, v)| { + let count = + utils::u64_from_bytes(&k[prefix.len()..prefix.len() + mem::size_of::()]) + .map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?; + let user_id = UserId::try_from( + utils::string_from_bytes(&k[prefix.len() + mem::size_of::() + 1..]) + .map_err(|_| { + Error::bad_database("Invalid readreceiptid userid bytes in db.") + })?, + ) + .map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?; + let mut json = serde_json::from_slice::(&v).map_err(|_| { Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") })?; json.remove("room_id"); - Ok(Raw::from_json( - serde_json::value::to_raw_value(&json).expect("json is valid raw value"), + + Ok(( + user_id, + count, + Raw::from_json( + serde_json::value::to_raw_value(&json).expect("json is valid raw value"), + ), )) })) } diff --git a/src/database/sending.rs b/src/database/sending.rs index e530396..199bd05 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, convert::{TryFrom, TryInto}, fmt::Debug, sync::Arc, @@ -14,8 +14,15 @@ use log::{error, warn}; use ring::digest; use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ - api::{appservice, federation, OutgoingRequest}, - events::{push_rules, EventType}, + api::{ + appservice, + federation::{ + self, + transactions::edu::{Edu, ReceiptContent, ReceiptData, ReceiptMap}, + }, + OutgoingRequest, + }, + events::{push_rules, AnySyncEphemeralRoomEvent, EventType}, push, ServerName, UInt, UserId, }; use sled::IVec; @@ -64,6 +71,7 @@ pub enum SendingEventType { #[derive(Clone)] pub struct Sending { /// The state for a given state hash. + pub(super) servername_educount: sled::Tree, // EduCount: Count of last EDU sync pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId pub(super) servercurrentevents: sled::Tree, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / (*)EduEvent pub(super) maximum_requests: Arc, @@ -194,7 +202,7 @@ impl Sending { if let sled::Event::Insert { key, .. } = event { if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key) { - if let Some(events) = Self::select_events(&outgoing_kind, vec![(event, key)], &mut current_transaction_status, &servercurrentevents, &servernamepduids) { + if let Some(events) = Self::select_events(&outgoing_kind, vec![(event, key)], &mut current_transaction_status, &servercurrentevents, &servernamepduids, &db) { futures.push(Self::handle_events(outgoing_kind, events, &db)); } } @@ -211,6 +219,7 @@ impl Sending { current_transaction_status: &mut HashMap, TransactionStatus>, servercurrentevents: &sled::Tree, servernamepduids: &sled::Tree, + db: &Database, ) -> Option> { let mut retry = false; let mut allow = true; @@ -267,11 +276,102 @@ impl Sending { events.push(e); } + + match outgoing_kind { + OutgoingKind::Normal(server_name) => { + if let Ok((select_edus, last_count)) = Self::select_edus(db, server_name) { + events.extend_from_slice(&select_edus); + db.sending + .servername_educount + .insert(server_name.as_bytes(), &last_count.to_be_bytes()) + .unwrap(); + } + } + _ => {} + } } Some(events) } + pub fn select_edus(db: &Database, server: &ServerName) -> Result<(Vec, u64)> { + // u64: count of last edu + let since = db + .sending + .servername_educount + .get(server.as_bytes())? + .map_or(Ok(0), |bytes| { + utils::u64_from_bytes(&bytes) + .map_err(|_| Error::bad_database("Invalid u64 in servername_educount.")) + })?; + let mut events = Vec::new(); + let mut max_edu_count = since; + 'outer: for room_id in db.rooms.server_rooms(server) { + let room_id = room_id?; + for r in db.rooms.edus.readreceipts_since(&room_id, since)? { + let (user_id, count, read_receipt) = r?; + + if count > max_edu_count { + max_edu_count = count; + } + + if user_id.server_name() != db.globals.server_name() { + continue; + } + + let event = + serde_json::from_str::(&read_receipt.json().get()) + .map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?; + let federation_event = match event { + AnySyncEphemeralRoomEvent::Receipt(r) => { + let mut read = BTreeMap::new(); + + let (event_id, receipt) = r + .content + .0 + .into_iter() + .next() + .expect("we only use one event per read receipt"); + let receipt = receipt + .read + .expect("our read receipts always set this") + .remove(&user_id) + .expect("our read receipts always have the user here"); + + read.insert( + user_id, + ReceiptData { + data: receipt.clone(), + event_ids: vec![event_id.clone()], + }, + ); + + let receipt_map = ReceiptMap { read }; + + let mut receipts = BTreeMap::new(); + receipts.insert(room_id.clone(), receipt_map); + + Edu::Receipt(ReceiptContent { receipts }) + } + _ => { + Error::bad_database("Invalid event type in read_receipts"); + continue; + } + }; + + events.push(SendingEventType::Edu( + serde_json::to_vec(&federation_event).expect("json can be serialized"), + )); + + if events.len() >= 20 { + break 'outer; + } + } + } + + Ok((events, max_edu_count)) + } + #[tracing::instrument(skip(self))] pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: IVec) -> Result<()> { let mut key = b"$".to_vec(); From ae41bc50677165d722ef4f7c7b3a9ee89e39d0b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Mon, 17 May 2021 10:58:44 +0200 Subject: [PATCH 3/7] fix: run state res with old current state again I'm a bit torn on the "auth check based on the current state of the room". It can mean multiple things: 1. The state of the room before the homeserver looked at the event at all. But that means if a message event from a user arrives, but we didn't see their join event before, we soft fail the message (even though we would find the join event when going through the auth events of the event and doing state res) 2. The state of the room after doing state-res with the event and our previous room state. We need to do this state resolution to find the new room state anyway, so we could just use the new room state for the auth check. The problem is that if the incoming event is a membership leave event, the new room state does not allow another leave event. This is obviously the wrong option. 3. The state of the room after doing state-res with the state **before** the event and our previous room state. This will mean a lot more calculations because we have to run state-res again We used 2. before and now use 1. again --- src/server_server.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/server_server.rs b/src/server_server.rs index 77eb448..1a1716d 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1126,9 +1126,9 @@ pub fn handle_incoming_pdu<'a>( .map_err(|_| "Failed to load room state.".to_owned())? .into_iter() .map(|(k, v)| (k, Arc::new(v))) - .collect(); + .collect::>(); - fork_states.insert(current_state); + fork_states.insert(current_state.clone()); // We also add state after incoming event to the fork states extremities.insert(incoming_pdu.event_id.clone()); @@ -1229,12 +1229,7 @@ pub fn handle_incoming_pdu<'a>( &room_version, &incoming_pdu, previous_create, - &new_room_state - .iter() - .filter_map(|(k, v)| { - Some((k.clone(), Arc::new(db.rooms.get_pdu(&v).ok().flatten()?))) - }) - .collect(), + ¤t_state, None, ) .map_err(|_e| "Auth check failed.".to_owned())?; From 09157b2096704119ccb9416ef0ae9d97872fd53f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 20 May 2021 23:46:52 +0200 Subject: [PATCH 4/7] improvement: federation get_keys and optimize signingkey storage - get encryption keys over federation - optimize signing key storage - rate limit parsing of bad events - rate limit signature fetching - dependency bumps --- Cargo.lock | 203 +++++++++++++++++-------------- Cargo.toml | 7 +- src/client_server/config.rs | 33 +++-- src/client_server/keys.rs | 155 ++++++++++++----------- src/client_server/membership.rs | 47 +++++-- src/client_server/read_marker.rs | 52 ++++---- src/client_server/sync.rs | 6 +- src/database.rs | 2 +- src/database/account_data.rs | 6 +- src/database/globals.rs | 92 +++++++------- src/database/pusher.rs | 3 +- src/database/rooms/edus.rs | 4 +- src/database/sending.rs | 31 +++-- src/database/users.rs | 29 ++--- src/main.rs | 1 + src/pdu.rs | 17 +-- src/ruma_wrapper.rs | 51 ++++---- src/server_server.rs | 198 +++++++++++++++++++++++++----- 18 files changed, 566 insertions(+), 371 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e750620..e43638d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -316,9 +316,9 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.13" +version = "0.99.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f82b1b72f1263f214c0f823371768776c4f5841b942c9883aa8e5ec584fd0ba6" +checksum = "5cc7b9cef1e351660e5443924e4f43ab25fbbed3e9a5f052df3677deb4d6b320" dependencies = [ "convert_case", "proc-macro2", @@ -470,9 +470,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d5813545e459ad3ca1bff9915e9ad7f1a47dc6a91b627ce321d5863b7dd253" +checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27" dependencies = [ "futures-channel", "futures-core", @@ -485,9 +485,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce79c6a52a299137a6013061e0cf0e688fce5d7f1bc60125f520912fdb29ec25" +checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" dependencies = [ "futures-core", "futures-sink", @@ -495,15 +495,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "098cd1c6dda6ca01650f1a37a794245eb73181d0d4d4e955e2f3c37db7af1815" +checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" [[package]] name = "futures-executor" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10f6cb7042eda00f0049b1d2080aa4b93442997ee507eb3828e8bd7577f94c9d" +checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79" dependencies = [ "futures-core", "futures-task", @@ -512,16 +512,17 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "365a1a1fb30ea1c03a830fdb2158f5236833ac81fa0ad12fe35b29cddc35cb04" +checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" [[package]] name = "futures-macro" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "668c6733a182cd7deb4f1de7ba3bf2120823835b3bcfbeacf7d2c4a773c1bb8b" +checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121" dependencies = [ + "autocfg", "proc-macro-hack", "proc-macro2", "quote", @@ -530,22 +531,23 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5629433c555de3d82861a7a4e3794a4c40040390907cfbfd7143a92a426c23" +checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282" [[package]] name = "futures-task" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba7aa51095076f3ba6d9a1f702f74bd05ec65f555d70d2033d55ba8d69f581bc" +checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae" [[package]] name = "futures-util" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c144ad54d60f23927f0a6b6d816e4271278b64f005ad65e4e35291d2de9c025" +checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" dependencies = [ + "autocfg", "futures-channel", "futures-core", "futures-io", @@ -676,9 +678,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfb77c123b4e2f72a2069aeae0b4b4949cc7e966df277813fc16347e7549737" +checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" dependencies = [ "bytes", "http", @@ -687,9 +689,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a1ce40d6fc9764887c2fdc7305c3dcc429ba11ff981c1509416afd5697e4437" +checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" [[package]] name = "httpdate" @@ -784,6 +786,15 @@ dependencies = [ "serde", ] +[[package]] +name = "indoc" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a75aeaaef0ce18b58056d306c27b07436fbb34b8816c53094b76dd81803136" +dependencies = [ + "unindent", +] + [[package]] name = "inlinable_string" version = "0.1.14" @@ -864,9 +875,9 @@ checksum = "229d53d58899083193af11e15917b5640cd40b29ff475a1fe4ef725deb02d0f2" [[package]] name = "js-sys" -version = "0.3.50" +version = "0.3.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d99f9e3e84b8f67f846ef5b4cbbc3b1c29f6c759fcbce6f01aa0e73d932a24c" +checksum = "83bdfbace3a0e81a4253f73b49e960b053e396a11012cbd49b9b74d6a2b67062" dependencies = [ "wasm-bindgen", ] @@ -1129,9 +1140,9 @@ checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" [[package]] name = "openssl-probe" -version = "0.1.2" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" +checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "opentelemetry" @@ -1614,8 +1625,8 @@ dependencies = [ [[package]] name = "ruma" -version = "0.0.3" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.1.1" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "assign", "js_int", @@ -1635,8 +1646,8 @@ dependencies = [ [[package]] name = "ruma-api" -version = "0.17.0-alpha.4" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.17.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "bytes", "http", @@ -1651,8 +1662,8 @@ dependencies = [ [[package]] name = "ruma-api-macros" -version = "0.17.0-alpha.4" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.17.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1662,8 +1673,8 @@ dependencies = [ [[package]] name = "ruma-appservice-api" -version = "0.2.0-alpha.3" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.2.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "ruma-api", "ruma-common", @@ -1676,8 +1687,8 @@ dependencies = [ [[package]] name = "ruma-client-api" -version = "0.10.0-alpha.3" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.10.1" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "assign", "bytes", @@ -1696,8 +1707,8 @@ dependencies = [ [[package]] name = "ruma-common" -version = "0.5.0" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.5.1" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "indexmap", "js_int", @@ -1711,9 +1722,10 @@ dependencies = [ [[package]] name = "ruma-events" -version = "0.22.0-alpha.3" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.22.2" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ + "indoc", "js_int", "ruma-common", "ruma-events-macros", @@ -1725,8 +1737,8 @@ dependencies = [ [[package]] name = "ruma-events-macros" -version = "0.22.0-alpha.3" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.22.2" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1736,8 +1748,8 @@ dependencies = [ [[package]] name = "ruma-federation-api" -version = "0.1.0-alpha.2" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.1.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "js_int", "ruma-api", @@ -1751,8 +1763,8 @@ dependencies = [ [[package]] name = "ruma-identifiers" -version = "0.19.0" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.19.1" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "paste", "rand", @@ -1765,8 +1777,8 @@ dependencies = [ [[package]] name = "ruma-identifiers-macros" -version = "0.19.0" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.19.1" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "quote", "ruma-identifiers-validation", @@ -1776,12 +1788,12 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.3.0" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" [[package]] name = "ruma-identity-service-api" -version = "0.1.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.1.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "js_int", "ruma-api", @@ -1793,8 +1805,8 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" -version = "0.1.0-alpha.1" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.1.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "js_int", "ruma-api", @@ -1808,8 +1820,8 @@ dependencies = [ [[package]] name = "ruma-serde" -version = "0.3.1" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.4.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "bytes", "form_urlencoded", @@ -1822,8 +1834,8 @@ dependencies = [ [[package]] name = "ruma-serde-macros" -version = "0.3.1" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.4.0" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1833,8 +1845,8 @@ dependencies = [ [[package]] name = "ruma-signatures" -version = "0.7.0" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +version = "0.7.1" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "base64 0.13.0", "ring", @@ -1847,11 +1859,12 @@ dependencies = [ [[package]] name = "ruma-state-res" version = "0.1.0" -source = "git+https://github.com/ruma/ruma?rev=71686ce8a4d1770a80de216080718fe9de7bd925#71686ce8a4d1770a80de216080718fe9de7bd925" +source = "git+https://github.com/ruma/ruma?rev=a238a0dda5b06fad146f8f01d690cbe011d13245#a238a0dda5b06fad146f8f01d690cbe011d13245" dependencies = [ "itertools 0.10.0", "js_int", "maplit", + "ruma-common", "ruma-events", "ruma-identifiers", "ruma-serde", @@ -1910,9 +1923,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd" +checksum = "61b3909d758bb75c79f23d4736fac9433868679d3ad2ea7a61e3c25cfda9a088" [[package]] name = "ryu" @@ -1992,18 +2005,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.125" +version = "1.0.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171" +checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.125" +version = "1.0.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d" +checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" dependencies = [ "proc-macro2", "quote", @@ -2361,9 +2374,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" +checksum = "bd3076b5c8cc18138b8f8814895c11eb4de37114a5d127bafdc5e55798ceef37" dependencies = [ "autocfg", "bytes", @@ -2380,9 +2393,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" +checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" dependencies = [ "proc-macro2", "quote", @@ -2402,9 +2415,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "940a12c99365c31ea8dd9ba04ec1be183ffe4920102bb7122c2f515437601e8e" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" dependencies = [ "bytes", "futures-core", @@ -2519,9 +2532,9 @@ dependencies = [ [[package]] name = "trust-dns-proto" -version = "0.20.2" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "952a078337565ba39007de99b151770f41039253a31846f0a3d5cd5a4ac8eedf" +checksum = "ad0d7f5db438199a6e2609debe3f69f808d074e0a2888ee0bccb45fe234d03f4" dependencies = [ "async-trait", "cfg-if 1.0.0", @@ -2544,9 +2557,9 @@ dependencies = [ [[package]] name = "trust-dns-resolver" -version = "0.20.2" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9c97f7d103e0f94dbe384a57908833505ae5870126492f166821b7cf685589" +checksum = "f6ad17b608a64bd0735e67bde16b0636f8aa8591f831a25d18443ed00a699770" dependencies = [ "cfg-if 1.0.0", "futures-util", @@ -2633,6 +2646,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "unindent" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" + [[package]] name = "untrusted" version = "0.7.1" @@ -2675,9 +2694,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.73" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83240549659d187488f91f33c0f8547cbfef0b2088bc470c116d1d260ef623d9" +checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" dependencies = [ "cfg-if 1.0.0", "serde", @@ -2687,9 +2706,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.73" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae70622411ca953215ca6d06d3ebeb1e915f0f6613e3b495122878d7ebec7dae" +checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900" dependencies = [ "bumpalo", "lazy_static", @@ -2702,9 +2721,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.23" +version = "0.4.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81b8b767af23de6ac18bf2168b690bed2902743ddf0fb39252e36f9e2bfc63ea" +checksum = "5fba7978c679d53ce2d0ac80c8c175840feb849a161664365d1287b41f2e67f1" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -2714,9 +2733,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.73" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e734d91443f177bfdb41969de821e15c516931c3c3db3d318fa1b68975d0f6f" +checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2724,9 +2743,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.73" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53739ff08c8a68b0fdbcd54c372b8ab800b1449ab3c9d706503bc7dd1621b2c" +checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97" dependencies = [ "proc-macro2", "quote", @@ -2737,15 +2756,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.73" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9a543ae66aa233d14bb765ed9af4a33e81b8b58d1584cf1b47ff8cd0b9e4489" +checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" [[package]] name = "web-sys" -version = "0.3.50" +version = "0.3.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a905d57e488fec8861446d3393670fb50d27a262344013181c2cdf9fff5481be" +checksum = "e828417b379f3df7111d3a2a9e5753706cae29c41f7c4029ee9fd77f3e09e582" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 950924a..2343071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,8 @@ edition = "2018" rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "801e04bd5369eb39e126c75f6d11e1e9597304d8", features = ["tls"] } # Used to handle requests # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", rev = "71686ce8a4d1770a80de216080718fe9de7bd925", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } +ruma = { git = "https://github.com/ruma/ruma", rev = "a238a0dda5b06fad146f8f01d690cbe011d13245", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } +#ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-pre-spec", "unstable-exhaustive-types"] } # Used for long polling and federation sender, should be the same as rocket::tokio tokio = "1.2.0" @@ -108,5 +109,5 @@ maintainer-scripts = "debian/" systemd-units = { unit-name = "matrix-conduit" } # For flamegraphs: -[profile.release] -debug = true +#[profile.release] +#debug = true diff --git a/src/client_server/config.rs b/src/client_server/config.rs index 07a9bc9..e7daa9e 100644 --- a/src/client_server/config.rs +++ b/src/client_server/config.rs @@ -8,11 +8,11 @@ use ruma::{ set_room_account_data, }, }, - events::{custom::CustomEventContent, AnyBasicEventContent, BasicEvent}, + events::{AnyGlobalAccountDataEventContent, AnyRoomAccountDataEventContent}, serde::Raw, }; use serde::Deserialize; -use serde_json::value::RawValue as RawJsonValue; +use serde_json::{json, value::RawValue as RawJsonValue}; #[cfg(feature = "conduit_bin")] use rocket::{get, put}; @@ -28,7 +28,7 @@ pub async fn set_global_account_data_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let data = serde_json::from_str(body.data.get()) + let data = serde_json::from_str::(body.data.get()) .map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?; let event_type = body.event_type.to_string(); @@ -37,9 +37,10 @@ pub async fn set_global_account_data_route( None, sender_user, event_type.clone().into(), - &BasicEvent { - content: CustomEventContent { event_type, data }, - }, + &json!({ + "type": event_type, + "content": data, + }), &db.globals, )?; @@ -71,9 +72,10 @@ pub async fn set_room_account_data_route( Some(&body.room_id), sender_user, event_type.clone().into(), - &BasicEvent { - content: CustomEventContent { event_type, data }, - }, + &json!({ + "type": event_type, + "content": data, + }), &db.globals, )?; @@ -99,7 +101,7 @@ pub async fn get_global_account_data_route( .ok_or(Error::BadRequest(ErrorKind::NotFound, "Data not found."))?; db.flush().await?; - let account_data = serde_json::from_str::(event.get()) + let account_data = serde_json::from_str::(event.get()) .map_err(|_| Error::bad_database("Invalid account data event in db."))? .content; @@ -130,7 +132,7 @@ pub async fn get_room_account_data_route( .ok_or(Error::BadRequest(ErrorKind::NotFound, "Data not found."))?; db.flush().await?; - let account_data = serde_json::from_str::(event.get()) + let account_data = serde_json::from_str::(event.get()) .map_err(|_| Error::bad_database("Invalid account data event in db."))? .content; @@ -138,6 +140,11 @@ pub async fn get_room_account_data_route( } #[derive(Deserialize)] -struct ExtractEventContent { - content: Raw, +struct ExtractRoomEventContent { + content: Raw, +} + +#[derive(Deserialize)] +struct ExtractGlobalEventContent { + content: Raw, } diff --git a/src/client_server/keys.rs b/src/client_server/keys.rs index aafa157..7a88fb6 100644 --- a/src/client_server/keys.rs +++ b/src/client_server/keys.rs @@ -1,5 +1,5 @@ use super::{State, SESSION_ID_LENGTH}; -use crate::{utils, ConduitResult, Database, Error, Ruma}; +use crate::{utils, ConduitResult, Database, Error, Result, Ruma}; use ruma::{ api::client::{ error::ErrorKind, @@ -12,6 +12,7 @@ use ruma::{ }, }, encryption::UnsignedDeviceInfo, + DeviceId, UserId, }; use std::collections::{BTreeMap, HashSet}; @@ -78,74 +79,14 @@ pub async fn get_keys_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); - let mut master_keys = BTreeMap::new(); - let mut self_signing_keys = BTreeMap::new(); - let mut user_signing_keys = BTreeMap::new(); - let mut device_keys = BTreeMap::new(); + let response = get_keys_helper( + Some(sender_user), + &body.device_keys, + |u| u == sender_user, + &db, + )?; - for (user_id, device_ids) in &body.device_keys { - if device_ids.is_empty() { - let mut container = BTreeMap::new(); - for device_id in db.users.all_device_ids(user_id) { - let device_id = device_id?; - if let Some(mut keys) = db.users.get_device_keys(user_id, &device_id)? { - let metadata = db - .users - .get_device_metadata(user_id, &device_id)? - .ok_or_else(|| { - Error::bad_database("all_device_keys contained nonexistent device.") - })?; - - keys.unsigned = UnsignedDeviceInfo { - device_display_name: metadata.display_name, - }; - - container.insert(device_id, keys); - } - } - device_keys.insert(user_id.clone(), container); - } else { - for device_id in device_ids { - let mut container = BTreeMap::new(); - if let Some(mut keys) = db.users.get_device_keys(&user_id.clone(), &device_id)? { - let metadata = db.users.get_device_metadata(user_id, &device_id)?.ok_or( - Error::BadRequest( - ErrorKind::InvalidParam, - "Tried to get keys for nonexistent device.", - ), - )?; - - keys.unsigned = UnsignedDeviceInfo { - device_display_name: metadata.display_name, - }; - - container.insert(device_id.clone(), keys); - } - device_keys.insert(user_id.clone(), container); - } - } - - if let Some(master_key) = db.users.get_master_key(user_id, sender_user)? { - master_keys.insert(user_id.clone(), master_key); - } - if let Some(self_signing_key) = db.users.get_self_signing_key(user_id, sender_user)? { - self_signing_keys.insert(user_id.clone(), self_signing_key); - } - if user_id == sender_user { - if let Some(user_signing_key) = db.users.get_user_signing_key(sender_user)? { - user_signing_keys.insert(user_id.clone(), user_signing_key); - } - } - } - - Ok(get_keys::Response { - master_keys, - self_signing_keys, - user_signing_keys, - device_keys, - failures: BTreeMap::new(), - } - .into()) + Ok(response.into()) } #[cfg_attr( @@ -356,3 +297,81 @@ pub async fn get_key_changes_route( } .into()) } + +pub fn get_keys_helper bool>( + sender_user: Option<&UserId>, + device_keys_input: &BTreeMap>>, + allowed_signatures: F, + db: &Database, +) -> Result { + let mut master_keys = BTreeMap::new(); + let mut self_signing_keys = BTreeMap::new(); + let mut user_signing_keys = BTreeMap::new(); + let mut device_keys = BTreeMap::new(); + + for (user_id, device_ids) in device_keys_input { + if device_ids.is_empty() { + let mut container = BTreeMap::new(); + for device_id in db.users.all_device_ids(user_id) { + let device_id = device_id?; + if let Some(mut keys) = db.users.get_device_keys(user_id, &device_id)? { + let metadata = db + .users + .get_device_metadata(user_id, &device_id)? + .ok_or_else(|| { + Error::bad_database("all_device_keys contained nonexistent device.") + })?; + + keys.unsigned = UnsignedDeviceInfo { + device_display_name: metadata.display_name, + }; + + container.insert(device_id, keys); + } + } + device_keys.insert(user_id.clone(), container); + } else { + for device_id in device_ids { + let mut container = BTreeMap::new(); + if let Some(mut keys) = db.users.get_device_keys(&user_id.clone(), &device_id)? { + let metadata = db.users.get_device_metadata(user_id, &device_id)?.ok_or( + Error::BadRequest( + ErrorKind::InvalidParam, + "Tried to get keys for nonexistent device.", + ), + )?; + + keys.unsigned = UnsignedDeviceInfo { + device_display_name: metadata.display_name, + }; + + container.insert(device_id.clone(), keys); + } + device_keys.insert(user_id.clone(), container); + } + } + + if let Some(master_key) = db.users.get_master_key(user_id, &allowed_signatures)? { + master_keys.insert(user_id.clone(), master_key); + } + if let Some(self_signing_key) = db + .users + .get_self_signing_key(user_id, &allowed_signatures)? + { + self_signing_keys.insert(user_id.clone(), self_signing_key); + } + if Some(user_id) == sender_user { + if let Some(user_signing_key) = db.users.get_user_signing_key(user_id)? { + user_signing_keys.insert(user_id.clone(), user_signing_key); + } + } + } + + Ok(get_keys::Response { + master_keys, + self_signing_keys, + user_signing_keys, + device_keys, + failures: BTreeMap::new(), + }) +} diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 63c103d..de8b4cb 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -4,7 +4,7 @@ use crate::{ pdu::{PduBuilder, PduEvent}, server_server, utils, ConduitResult, Database, Error, Result, Ruma, }; -use log::{error, warn}; +use log::{debug, error, warn}; use member::{MemberEventContent, MembershipState}; use rocket::futures; use ruma::{ @@ -29,9 +29,10 @@ use ruma::{ uint, EventId, RoomId, RoomVersionId, ServerName, UserId, }; use std::{ - collections::{BTreeMap, HashSet}, + collections::{btree_map::Entry, BTreeMap, HashSet}, convert::{TryFrom, TryInto}, sync::{Arc, RwLock}, + time::{Duration, Instant}, }; #[cfg(feature = "conduit_bin")] @@ -703,6 +704,38 @@ async fn validate_and_add_event_id( error!("{:?}: {:?}", pdu, e); Error::BadServerResponse("Invalid PDU in server response") })?; + let event_id = EventId::try_from(&*format!( + "${}", + ruma::signatures::reference_hash(&value, &room_version) + .expect("ruma can calculate reference hashes") + )) + .expect("ruma's reference hashes are valid event ids"); + + let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { + Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), + }; + + if let Some((time, tries)) = db + .globals + .bad_event_ratelimiter + .read() + .unwrap() + .get(&event_id) + { + // Exponential backoff + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + debug!("Backing off from {}", event_id); + return Err(Error::BadServerResponse("bad event, still backing off")); + } + } server_server::fetch_required_signing_keys(&value, pub_key_map, db).await?; if let Err(e) = ruma::signatures::verify_event( @@ -712,17 +745,11 @@ async fn validate_and_add_event_id( &value, room_version, ) { - warn!("Event failed verification: {}", e); + warn!("Event {} failed verification: {}", event_id, e); + back_off(event_id); return Err(Error::BadServerResponse("Event failed verification.")); } - let event_id = EventId::try_from(&*format!( - "${}", - ruma::signatures::reference_hash(&value, &room_version) - .expect("ruma can calculate reference hashes") - )) - .expect("ruma's reference hashes are valid event ids"); - value.insert( "event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned()), diff --git a/src/client_server/read_marker.rs b/src/client_server/read_marker.rs index 166e59a..1b7ea0b 100644 --- a/src/client_server/read_marker.rs +++ b/src/client_server/read_marker.rs @@ -5,12 +5,14 @@ use ruma::{ error::ErrorKind, r0::{read_marker::set_read_marker, receipt::create_receipt}, }, - events::{AnyEphemeralRoomEvent, AnyEvent, EventType}, + events::{AnyEphemeralRoomEvent, EventType}, + receipt::ReceiptType, + MilliSecondsSinceUnixEpoch, }; #[cfg(feature = "conduit_bin")] use rocket::post; -use std::{collections::BTreeMap, time::SystemTime}; +use std::collections::BTreeMap; #[cfg_attr( feature = "conduit_bin", @@ -27,7 +29,6 @@ pub async fn set_read_marker_route( content: ruma::events::fully_read::FullyReadEventContent { event_id: body.fully_read.clone(), }, - room_id: body.room_id.clone(), }; db.account_data.update( Some(&body.room_id), @@ -54,26 +55,23 @@ pub async fn set_read_marker_route( user_receipts.insert( sender_user.clone(), ruma::events::receipt::Receipt { - ts: Some(SystemTime::now()), + ts: Some(MilliSecondsSinceUnixEpoch::now()), }, ); + + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::Read, user_receipts); + let mut receipt_content = BTreeMap::new(); - receipt_content.insert( - event.to_owned(), - ruma::events::receipt::Receipts { - read: Some(user_receipts), - }, - ); + receipt_content.insert(event.to_owned(), receipts); db.rooms.edus.readreceipt_update( &sender_user, &body.room_id, - AnyEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt( - ruma::events::receipt::ReceiptEvent { - content: ruma::events::receipt::ReceiptEventContent(receipt_content), - room_id: body.room_id.clone(), - }, - )), + AnyEphemeralRoomEvent::Receipt(ruma::events::receipt::ReceiptEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + room_id: body.room_id.clone(), + }), &db.globals, )?; } @@ -112,26 +110,22 @@ pub async fn create_receipt_route( user_receipts.insert( sender_user.clone(), ruma::events::receipt::Receipt { - ts: Some(SystemTime::now()), + ts: Some(MilliSecondsSinceUnixEpoch::now()), }, ); + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::Read, user_receipts); + let mut receipt_content = BTreeMap::new(); - receipt_content.insert( - body.event_id.to_owned(), - ruma::events::receipt::Receipts { - read: Some(user_receipts), - }, - ); + receipt_content.insert(body.event_id.to_owned(), receipts); db.rooms.edus.readreceipt_update( &sender_user, &body.room_id, - AnyEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt( - ruma::events::receipt::ReceiptEvent { - content: ruma::events::receipt::ReceiptEventContent(receipt_content), - room_id: body.room_id.clone(), - }, - )), + AnyEphemeralRoomEvent::Receipt(ruma::events::receipt::ReceiptEvent { + content: ruma::events::receipt::ReceiptEventContent(receipt_content), + room_id: body.room_id.clone(), + }), &db.globals, )?; diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index fe6f692..0a27b8d 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -422,7 +422,7 @@ pub async fn sync_events_route( } let joined_room = sync_events::JoinedRoom { - account_data: sync_events::AccountData { + account_data: sync_events::RoomAccountData { events: db .account_data .changes_since(Some(&room_id), &sender_user, since)? @@ -506,7 +506,7 @@ pub async fn sync_events_route( left_rooms.insert( room_id.clone(), sync_events::LeftRoom { - account_data: sync_events::AccountData { events: Vec::new() }, + account_data: sync_events::RoomAccountData { events: Vec::new() }, timeline: sync_events::Timeline { limited: false, prev_batch: Some(next_batch.clone()), @@ -577,7 +577,7 @@ pub async fn sync_events_route( .map(|(_, v)| Raw::from(v)) .collect(), }, - account_data: sync_events::AccountData { + account_data: sync_events::GlobalAccountData { events: db .account_data .changes_since(None, &sender_user, since)? diff --git a/src/database.rs b/src/database.rs index 6b68b9e..d7126e3 100644 --- a/src/database.rs +++ b/src/database.rs @@ -213,7 +213,7 @@ impl Database { pusher: pusher::PushData::new(&db)?, globals: globals::Globals::load( db.open_tree("global")?, - db.open_tree("servertimeout_signingkey")?, + db.open_tree("server_signingkeys")?, config, )?, _db: db, diff --git a/src/database/account_data.rs b/src/database/account_data.rs index f3832ea..bb970c3 100644 --- a/src/database/account_data.rs +++ b/src/database/account_data.rs @@ -1,7 +1,7 @@ use crate::{utils, Error, Result}; use ruma::{ api::client::error::ErrorKind, - events::{AnyEvent as EduEvent, EventType}, + events::{AnyEphemeralRoomEvent, EventType}, serde::Raw, RoomId, UserId, }; @@ -80,7 +80,7 @@ impl AccountData { room_id: Option<&RoomId>, user_id: &UserId, since: u64, - ) -> Result>> { + ) -> Result>> { let mut userdata = HashMap::new(); let mut prefix = room_id @@ -110,7 +110,7 @@ impl AccountData { .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, ) .map_err(|_| Error::bad_database("RoomUserData ID in db is invalid."))?, - serde_json::from_slice::>(&v).map_err(|_| { + serde_json::from_slice::>(&v).map_err(|_| { Error::bad_database("Database contains invalid account data.") })?, )) diff --git a/src/database/globals.rs b/src/database/globals.rs index c1eafe0..0dd73b2 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -2,20 +2,22 @@ use crate::{database::Config, utils, Error, Result}; use log::{error, info}; use ruma::{ api::federation::discovery::{ServerSigningKeys, VerifyKey}, - ServerName, ServerSigningKeyId, + EventId, MilliSecondsSinceUnixEpoch, ServerName, ServerSigningKeyId, }; use rustls::{ServerCertVerifier, WebPKIVerifier}; use std::{ collections::{BTreeMap, HashMap}, sync::{Arc, RwLock}, - time::Duration, + time::{Duration, Instant}, }; +use tokio::sync::Semaphore; use trust_dns_resolver::TokioAsyncResolver; pub const COUNTER: &str = "c"; type WellKnownMap = HashMap, (String, String)>; type TlsNameMap = HashMap; +type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries #[derive(Clone)] pub struct Globals { pub actual_destination_cache: Arc>, // actual_destination, host @@ -26,7 +28,10 @@ pub struct Globals { reqwest_client: reqwest::Client, dns_resolver: TokioAsyncResolver, jwt_decoding_key: Option>, - pub(super) servertimeout_signingkey: sled::Tree, // ServerName + Timeout Timestamp -> algorithm:key + pubkey + pub(super) server_signingkeys: sled::Tree, + pub bad_event_ratelimiter: Arc>>, + pub bad_signature_ratelimiter: Arc, RateLimitState>>>, + pub servername_ratelimiter: Arc, Arc>>>, } struct MatrixServerVerifier { @@ -65,7 +70,7 @@ impl ServerCertVerifier for MatrixServerVerifier { impl Globals { pub fn load( globals: sled::Tree, - servertimeout_signingkey: sled::Tree, + server_signingkeys: sled::Tree, config: Config, ) -> Result { let bytes = &*globals @@ -135,8 +140,11 @@ impl Globals { })?, actual_destination_cache: Arc::new(RwLock::new(WellKnownMap::new())), tls_name_override, - servertimeout_signingkey, + server_signingkeys, jwt_decoding_key, + bad_event_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), + bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), + servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), }) } @@ -203,31 +211,21 @@ impl Globals { /// Remove the outdated keys and insert the new ones. /// /// This doesn't actually check that the keys provided are newer than the old set. - pub fn add_signing_key(&self, origin: &ServerName, keys: &ServerSigningKeys) -> Result<()> { - let mut key1 = origin.as_bytes().to_vec(); - key1.push(0xff); - - let mut key2 = key1.clone(); - - let ts = keys - .valid_until_ts - .duration_since(std::time::UNIX_EPOCH) - .expect("time is valid") - .as_millis() as u64; - - key1.extend_from_slice(&ts.to_be_bytes()); - key2.extend_from_slice(&(ts + 1).to_be_bytes()); - - self.servertimeout_signingkey.insert( - key1, - serde_json::to_vec(&keys.verify_keys).expect("ServerSigningKeys are a valid string"), - )?; - - self.servertimeout_signingkey.insert( - key2, - serde_json::to_vec(&keys.old_verify_keys) - .expect("ServerSigningKeys are a valid string"), - )?; + pub fn add_signing_key(&self, origin: &ServerName, new_keys: &ServerSigningKeys) -> Result<()> { + self.server_signingkeys + .update_and_fetch(origin.as_bytes(), |signingkeys| { + let mut keys = signingkeys + .and_then(|keys| serde_json::from_slice(keys).ok()) + .unwrap_or_else(|| { + // Just insert "now", it doesn't matter + ServerSigningKeys::new(origin.to_owned(), MilliSecondsSinceUnixEpoch::now()) + }); + keys.verify_keys + .extend(new_keys.verify_keys.clone().into_iter()); + keys.old_verify_keys + .extend(new_keys.old_verify_keys.clone().into_iter()); + Some(serde_json::to_vec(&keys).expect("serversigningkeys can be serialized")) + })?; Ok(()) } @@ -237,26 +235,22 @@ impl Globals { &self, origin: &ServerName, ) -> Result> { - let mut response = BTreeMap::new(); + let signingkeys = self + .server_signingkeys + .get(origin.as_bytes())? + .and_then(|bytes| serde_json::from_slice::(&bytes).ok()) + .map(|keys| { + let mut tree = keys.verify_keys; + tree.extend( + keys.old_verify_keys + .into_iter() + .map(|old| (old.0, VerifyKey::new(old.1.key))), + ); + tree + }) + .unwrap_or_else(BTreeMap::new); - let now = crate::utils::millis_since_unix_epoch(); - - for item in self.servertimeout_signingkey.scan_prefix(origin.as_bytes()) { - let (k, bytes) = item?; - let valid_until = k - .splitn(2, |&b| b == 0xff) - .nth(1) - .map(crate::utils::u64_from_bytes) - .ok_or_else(|| Error::bad_database("Invalid signing keys."))? - .map_err(|_| Error::bad_database("Invalid signing key valid until bytes"))?; - // If these keys are still valid use em! - if valid_until > now { - let btree: BTreeMap<_, _> = serde_json::from_slice(&bytes) - .map_err(|_| Error::bad_database("Invalid BTreeMap<> of signing keys"))?; - response.extend(btree); - } - } - Ok(response) + Ok(signingkeys) } pub fn database_version(&self) -> Result { diff --git a/src/database/pusher.rs b/src/database/pusher.rs index 75c2efb..51f55a1 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -294,7 +294,8 @@ async fn send_notice( } else { notifi.sender = Some(&event.sender); notifi.event_type = Some(&event.kind); - notifi.content = serde_json::value::to_raw_value(&event.content).ok(); + let content = serde_json::value::to_raw_value(&event.content).ok(); + notifi.content = content.as_deref(); if event.kind == EventType::RoomMember { notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str()); diff --git a/src/database/rooms/edus.rs b/src/database/rooms/edus.rs index 89f2905..f4c7075 100644 --- a/src/database/rooms/edus.rs +++ b/src/database/rooms/edus.rs @@ -2,7 +2,7 @@ use crate::{utils, Error, Result}; use ruma::{ events::{ presence::{PresenceEvent, PresenceEventContent}, - AnyEvent as EduEvent, SyncEphemeralRoomEvent, + AnyEphemeralRoomEvent, SyncEphemeralRoomEvent, }, presence::PresenceState, serde::Raw, @@ -32,7 +32,7 @@ impl RoomEdus { &self, user_id: &UserId, room_id: &RoomId, - event: EduEvent, + event: AnyEphemeralRoomEvent, globals: &super::super::globals::Globals, ) -> Result<()> { let mut prefix = room_id.as_bytes().to_vec(); diff --git a/src/database/sending.rs b/src/database/sending.rs index 199bd05..ed5b5ef 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -3,7 +3,7 @@ use std::{ convert::{TryFrom, TryInto}, fmt::Debug, sync::Arc, - time::{Duration, Instant, SystemTime}, + time::{Duration, Instant}, }; use crate::{ @@ -23,7 +23,9 @@ use ruma::{ OutgoingRequest, }, events::{push_rules, AnySyncEphemeralRoomEvent, EventType}, - push, ServerName, UInt, UserId, + push, + receipt::ReceiptType, + MilliSecondsSinceUnixEpoch, ServerName, UInt, UserId, }; use sled::IVec; use tokio::{select, sync::Semaphore}; @@ -277,17 +279,14 @@ impl Sending { events.push(e); } - match outgoing_kind { - OutgoingKind::Normal(server_name) => { - if let Ok((select_edus, last_count)) = Self::select_edus(db, server_name) { - events.extend_from_slice(&select_edus); - db.sending - .servername_educount - .insert(server_name.as_bytes(), &last_count.to_be_bytes()) - .unwrap(); - } + if let OutgoingKind::Normal(server_name) = outgoing_kind { + if let Ok((select_edus, last_count)) = Self::select_edus(db, server_name) { + events.extend_from_slice(&select_edus); + db.sending + .servername_educount + .insert(server_name.as_bytes(), &last_count.to_be_bytes()) + .unwrap(); } - _ => {} } } @@ -326,14 +325,14 @@ impl Sending { AnySyncEphemeralRoomEvent::Receipt(r) => { let mut read = BTreeMap::new(); - let (event_id, receipt) = r + let (event_id, mut receipt) = r .content .0 .into_iter() .next() .expect("we only use one event per read receipt"); let receipt = receipt - .read + .remove(&ReceiptType::Read) .expect("our read receipts always set this") .remove(&user_id) .expect("our read receipts always have the user here"); @@ -436,7 +435,7 @@ impl Sending { ), ) })? - .to_any_event()) + .to_room_event()) } SendingEventType::Edu(_) => { // Appservices don't need EDUs (?) @@ -610,7 +609,7 @@ impl Sending { origin: db.globals.server_name(), pdus: &pdu_jsons, edus: &edu_jsons, - origin_server_ts: SystemTime::now(), + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), transaction_id: &base64::encode_config( Self::calculate_hash( &events diff --git a/src/database/users.rs b/src/database/users.rs index 9cdfb5f..a5b8775 100644 --- a/src/database/users.rs +++ b/src/database/users.rs @@ -1,19 +1,13 @@ use crate::{utils, Error, Result}; use ruma::{ - api::client::{ - error::ErrorKind, - r0::{ - device::Device, - keys::{CrossSigningKey, OneTimeKey}, - }, - }, - encryption::DeviceKeys, + api::client::{error::ErrorKind, r0::device::Device}, + encryption::{CrossSigningKey, DeviceKeys, OneTimeKey}, events::{AnyToDeviceEvent, EventType}, identifiers::MxcUri, serde::Raw, - DeviceId, DeviceKeyAlgorithm, DeviceKeyId, UInt, UserId, + DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, UInt, UserId, }; -use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime}; +use std::{collections::BTreeMap, convert::TryFrom, mem}; #[derive(Clone)] pub struct Users { @@ -200,7 +194,7 @@ impl Users { device_id: device_id.into(), display_name: initial_device_display_name, last_seen_ip: None, // TODO - last_seen_ts: Some(SystemTime::now()), + last_seen_ts: Some(MilliSecondsSinceUnixEpoch::now()), }) .expect("Device::to_string never fails.") .as_bytes(), @@ -653,12 +647,11 @@ impl Users { }) } - pub fn get_master_key( + pub fn get_master_key bool>( &self, user_id: &UserId, - sender_id: &UserId, + allowed_signatures: F, ) -> Result> { - // TODO: hide some signatures self.userid_masterkeyid .get(user_id.to_string())? .map_or(Ok(None), |key| { @@ -673,7 +666,7 @@ impl Users { cross_signing_key.signatures = cross_signing_key .signatures .into_iter() - .filter(|(user, _)| user == user_id || user == sender_id) + .filter(|(user, _)| allowed_signatures(user)) .collect(); Ok(Some(cross_signing_key)) @@ -681,10 +674,10 @@ impl Users { }) } - pub fn get_self_signing_key( + pub fn get_self_signing_key bool>( &self, user_id: &UserId, - sender_id: &UserId, + allowed_signatures: F, ) -> Result> { self.userid_selfsigningkeyid .get(user_id.to_string())? @@ -700,7 +693,7 @@ impl Users { cross_signing_key.signatures = cross_signing_key .signatures .into_iter() - .filter(|(user, _)| user == user_id || user == sender_id) + .filter(|(user, _)| user == user_id || allowed_signatures(user)) .collect(); Ok(Some(cross_signing_key)) diff --git a/src/main.rs b/src/main.rs index 57eb0d0..7257d69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -158,6 +158,7 @@ fn setup_rocket(config: Figment, data: Database) -> rocket::Rocket Raw { + pub fn to_any_event(&self) -> Raw { let mut json = json!({ "content": self.content, "type": self.kind, @@ -267,10 +269,9 @@ impl state_res::Event for PduEvent { fn content(&self) -> serde_json::Value { self.content.clone() } - fn origin_server_ts(&self) -> std::time::SystemTime { - UNIX_EPOCH + std::time::Duration::from_millis(self.origin_server_ts.into()) + fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch { + MilliSecondsSinceUnixEpoch(self.origin_server_ts) } - fn state_key(&self) -> Option { self.state_key.clone() } diff --git a/src/ruma_wrapper.rs b/src/ruma_wrapper.rs index 9143999..d0f7303 100644 --- a/src/ruma_wrapper.rs +++ b/src/ruma_wrapper.rs @@ -34,6 +34,7 @@ pub struct Ruma { pub body: T::Incoming, pub sender_user: Option, pub sender_device: Option>, + pub sender_servername: Option>, // This is None when body is not a valid string pub json_body: Option, pub from_appservice: bool, @@ -68,7 +69,10 @@ where let mut json_body = serde_json::from_slice::(&body).ok(); - let (sender_user, sender_device, from_appservice) = if let Some((_id, registration)) = db + let (sender_user, sender_device, sender_servername, from_appservice) = if let Some(( + _id, + registration, + )) = db .appservice .iter_all() .filter_map(|r| r.ok()) @@ -104,10 +108,10 @@ where } // TODO: Check if appservice is allowed to be that user - (Some(user_id), None, true) + (Some(user_id), None, None, true) } - AuthScheme::ServerSignatures => (None, None, true), - AuthScheme::None => (None, None, true), + AuthScheme::ServerSignatures => (None, None, None, true), + AuthScheme::None => (None, None, None, true), } } else { match metadata.authentication { @@ -116,9 +120,12 @@ where match db.users.find_from_token(&token).unwrap() { // Unknown Token None => return Failure((Status::raw(581), ())), - Some((user_id, device_id)) => { - (Some(user_id), Some(Box::::from(device_id)), false) - } + Some((user_id, device_id)) => ( + Some(user_id), + Some(Box::::from(device_id)), + None, + false, + ), } } else { // Missing Token @@ -227,27 +234,24 @@ where CanonicalJsonValue::Object(signatures), ); - let keys = match server_server::fetch_signing_keys( - &db, - &origin, - vec![&key.to_owned()], - ) - .await - { - Ok(b) => b, - Err(e) => { - warn!("Failed to fetch signing keys: {}", e); + let keys = + match server_server::fetch_signing_keys(&db, &origin, vec![key.to_owned()]) + .await + { + Ok(b) => b, + Err(e) => { + warn!("Failed to fetch signing keys: {}", e); - // Forbidden - return Failure((Status::raw(580), ())); - } - }; + // Forbidden + return Failure((Status::raw(580), ())); + } + }; let mut pub_key_map = BTreeMap::new(); pub_key_map.insert(origin.as_str().to_owned(), keys); match ruma::signatures::verify_json(&pub_key_map, &request_map) { - Ok(()) => (None, None, false), + Ok(()) => (None, None, Some(origin), false), Err(e) => { warn!("Failed to verify json request from {}: {}", origin, e); @@ -260,7 +264,7 @@ where } } } - AuthScheme::None => (None, None, false), + AuthScheme::None => (None, None, None, false), } }; @@ -307,6 +311,7 @@ where body: t, sender_user, sender_device, + sender_servername, from_appservice, json_body, }), diff --git a/src/server_server.rs b/src/server_server.rs index 1a1716d..d51c9eb 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,7 +1,10 @@ -use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; +use crate::{ + client_server::{self, get_keys_helper}, + utils, ConduitResult, Database, Error, PduEvent, Result, Ruma, +}; use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace, warn}; use regex::Regex; use rocket::{response::content::Json, State}; use ruma::{ @@ -15,6 +18,7 @@ use ruma::{ VerifyKey, }, event::{get_event, get_missing_events, get_room_state_ids}, + keys::get_keys, membership::{ create_invite, create_join_event::{self, RoomState}, @@ -32,12 +36,14 @@ use ruma::{ create::CreateEventContent, member::{MemberEventContent, MembershipState}, }, - AnyEphemeralRoomEvent, AnyEvent as EduEvent, EventType, + AnyEphemeralRoomEvent, EventType, }, + receipt::ReceiptType, serde::Raw, signatures::{CanonicalJsonObject, CanonicalJsonValue}, state_res::{self, Event, EventMap, RoomVersion, StateMap}, - uint, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId, + uint, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, + ServerSigningKeyId, UserId, }; use std::{ collections::{btree_map::Entry, BTreeMap, BTreeSet, HashSet}, @@ -49,8 +55,9 @@ use std::{ pin::Pin, result::Result as StdResult, sync::{Arc, RwLock}, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; +use tokio::sync::Semaphore; #[cfg(feature = "conduit_bin")] use rocket::{get, post, put}; @@ -452,7 +459,10 @@ pub fn get_server_keys_route(db: State<'_, Database>) -> Json { verify_keys, old_verify_keys: BTreeMap::new(), signatures: BTreeMap::new(), - valid_until_ts: SystemTime::now() + Duration::from_secs(60 * 2), + valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time( + SystemTime::now() + Duration::from_secs(60 * 2), + ) + .expect("time is valid"), }, } .try_into_http_response::>() @@ -608,6 +618,7 @@ pub async fn send_transaction_message_route<'a>( } }; + let start_time = Instant::now(); if let Err(e) = handle_incoming_pdu( &body.origin, &event_id, @@ -619,7 +630,17 @@ pub async fn send_transaction_message_route<'a>( ) .await { - resolved_map.insert(event_id, Err(e)); + resolved_map.insert(event_id.clone(), Err(e)); + } + + let elapsed = start_time.elapsed(); + if elapsed > Duration::from_secs(1) { + warn!( + "Handling event {} took {}m{}s", + event_id, + elapsed.as_secs() / 60, + elapsed.as_secs() % 60 + ); } } @@ -653,19 +674,16 @@ pub async fn send_transaction_message_route<'a>( let mut user_receipts = BTreeMap::new(); user_receipts.insert(user_id.clone(), user_updates.data); - let mut receipt_content = BTreeMap::new(); - receipt_content.insert( - event_id.to_owned(), - ruma::events::receipt::Receipts { - read: Some(user_receipts), - }, - ); + let mut receipts = BTreeMap::new(); + receipts.insert(ReceiptType::Read, user_receipts); - let event = - EduEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt(ReceiptEvent { - content: ReceiptEventContent(receipt_content), - room_id: room_id.clone(), - })); + let mut receipt_content = BTreeMap::new(); + receipt_content.insert(event_id.to_owned(), receipts); + + let event = AnyEphemeralRoomEvent::Receipt(ReceiptEvent { + content: ReceiptEventContent(receipt_content), + room_id: room_id.clone(), + }); db.rooms.edus.readreceipt_update( &user_id, &room_id, @@ -698,6 +716,8 @@ pub async fn send_transaction_message_route<'a>( } } + info!("/send/{} done", body.transaction_id); + Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) } @@ -794,7 +814,7 @@ pub fn handle_incoming_pdu<'a>( ) { Err(e) => { // Drop - warn!("{:?}: {}", value, e); + warn!("Dropping bad event {}: {}", event_id, e); return Err("Signature verification failed".to_string()); } Ok(ruma::signatures::Verified::Signatures) => { @@ -821,6 +841,7 @@ pub fn handle_incoming_pdu<'a>( // 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events // 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events" + // EDIT: Step 5 is not applied anymore because it failed too often debug!("Fetching auth events for {}", incoming_pdu.event_id); fetch_and_handle_events( db, @@ -1292,12 +1313,30 @@ pub(crate) fn fetch_and_handle_events<'a>( auth_cache: &'a mut EventMap>, ) -> AsyncRecursiveResult<'a, Vec>, Error> { Box::pin(async move { + let back_off = |id| match db.globals.bad_event_ratelimiter.write().unwrap().entry(id) { + Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), + }; + let mut pdus = vec![]; for id in events { + if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&id) { + // Exponential backoff + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + debug!("Backing off from {}", id); + continue; + } + } // a. Look at auth cache let pdu = match auth_cache.get(id) { Some(pdu) => { - debug!("Found {} in cache", id); // We already have the auth chain for events in cache pdu.clone() } @@ -1306,7 +1345,7 @@ pub(crate) fn fetch_and_handle_events<'a>( // (get_pdu checks both) None => match db.rooms.get_pdu(&id)? { Some(pdu) => { - debug!("Found {} in db", id); + trace!("Found {} in db", id); // We need to fetch the auth chain let _ = fetch_and_handle_events( db, @@ -1331,7 +1370,7 @@ pub(crate) fn fetch_and_handle_events<'a>( .await { Ok(res) => { - debug!("Got {} over federation: {:?}", id, res); + debug!("Got {} over federation", id); let (event_id, mut value) = crate::pdu::gen_event_id_canonical_json(&res.pdu)?; // This will also fetch the auth chain @@ -1358,12 +1397,14 @@ pub(crate) fn fetch_and_handle_events<'a>( } Err(e) => { warn!("Authentication of event {} failed: {:?}", id, e); + back_off(id.clone()); continue; } } } Err(_) => { warn!("Failed to fetch event: {}", id); + back_off(id.clone()); continue; } } @@ -1383,10 +1424,67 @@ pub(crate) fn fetch_and_handle_events<'a>( pub(crate) async fn fetch_signing_keys( db: &Database, origin: &ServerName, - signature_ids: Vec<&String>, + signature_ids: Vec, ) -> Result> { let contains_all_ids = - |keys: &BTreeMap| signature_ids.iter().all(|&id| keys.contains_key(id)); + |keys: &BTreeMap| signature_ids.iter().all(|id| keys.contains_key(id)); + + let permit = db + .globals + .servername_ratelimiter + .read() + .unwrap() + .get(origin) + .map(|s| Arc::clone(s).acquire_owned()); + + let permit = match permit { + Some(p) => p, + None => { + let mut write = db.globals.servername_ratelimiter.write().unwrap(); + let s = Arc::clone( + write + .entry(origin.to_owned()) + .or_insert_with(|| Arc::new(Semaphore::new(1))), + ); + + s.acquire_owned() + } + } + .await; + + let back_off = |id| match db + .globals + .bad_signature_ratelimiter + .write() + .unwrap() + .entry(id) + { + Entry::Vacant(e) => { + e.insert((Instant::now(), 1)); + } + Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1), + }; + + if let Some((time, tries)) = db + .globals + .bad_signature_ratelimiter + .read() + .unwrap() + .get(&signature_ids) + { + // Exponential backoff + let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); + if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { + min_elapsed_duration = Duration::from_secs(60 * 60 * 24); + } + + if time.elapsed() < min_elapsed_duration { + debug!("Backing off from {:?}", signature_ids); + return Err(Error::BadServerResponse("bad signature, still backing off")); + } + } + + debug!("Loading signing keys for {}", origin); let mut result = db .globals @@ -1399,6 +1497,8 @@ pub(crate) async fn fetch_signing_keys( return Ok(result); } + debug!("Fetching signing keys for {} over federation", origin); + if let Ok(get_keys_response) = db .sending .send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new()) @@ -1436,14 +1536,17 @@ pub(crate) async fn fetch_signing_keys( &server, get_remote_server_keys::v2::Request::new( origin, - SystemTime::now() - .checked_add(Duration::from_secs(3600)) - .expect("SystemTime to large"), + MilliSecondsSinceUnixEpoch::from_system_time( + SystemTime::now() + .checked_add(Duration::from_secs(3600)) + .expect("SystemTime to large"), + ) + .expect("time is valid"), ), ) .await { - debug!("Got signing keys: {:?}", keys); + trace!("Got signing keys: {:?}", keys); for k in keys.server_keys { db.globals.add_signing_key(origin, &k)?; result.extend( @@ -1464,6 +1567,10 @@ pub(crate) async fn fetch_signing_keys( } } + drop(permit); + + back_off(signature_ids); + warn!("Failed to find public key for server: {}", origin); Err(Error::BadServerResponse( "Failed to find public key for server", @@ -1581,7 +1688,7 @@ pub fn get_event_route<'a>( Ok(get_event::v1::Response { origin: db.globals.server_name().to_owned(), - origin_server_ts: SystemTime::now(), + origin_server_ts: MilliSecondsSinceUnixEpoch::now(), pdu: PduEvent::convert_to_outgoing_federation_event( db.rooms .get_pdu_json(&body.event_id)? @@ -2186,6 +2293,34 @@ pub fn get_profile_information_route<'a>( .into()) } +#[cfg_attr( + feature = "conduit_bin", + post("/_matrix/federation/v1/user/keys/query", data = "") +)] +#[tracing::instrument(skip(db, body))] +pub fn get_keys_route<'a>( + db: State<'a, Database>, + body: Ruma, +) -> ConduitResult { + if !db.globals.allow_federation() { + return Err(Error::bad_config("Federation is disabled.")); + } + + let result = get_keys_helper( + None, + &body.device_keys, + |u| Some(u.server_name()) == body.sender_servername.as_deref(), + &db, + )?; + + Ok(get_keys::v1::Response { + device_keys: result.device_keys, + master_keys: result.master_keys, + self_signing_keys: result.self_signing_keys, + } + .into()) +} + pub async fn fetch_required_signing_keys( event: &BTreeMap, pub_key_map: &RwLock>>, @@ -2208,9 +2343,8 @@ pub async fn fetch_required_signing_keys( "Invalid signatures content object in server response pdu.", ))?; - let signature_ids = signature_object.keys().collect::>(); + let signature_ids = signature_object.keys().cloned().collect::>(); - debug!("Fetching signing keys for {}", signature_server); let fetch_res = fetch_signing_keys( db, &Box::::try_from(&**signature_server).map_err(|_| { From 989d843c40f112205e34c454940e3031e30923ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Fri, 21 May 2021 18:12:02 +0200 Subject: [PATCH 5/7] fix: unauthorized pdus will be responded to with FORBIDDEN --- src/client_server/membership.rs | 12 +++--------- src/database/rooms.rs | 2 +- src/server_server.rs | 4 ++-- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index de8b4cb..9674b7a 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -569,13 +569,7 @@ async fn join_room_by_id_helper( { let (event_id, value) = match result { Ok(t) => t, - Err(e) => { - warn!( - "PDU could not be verified: {:?} {:?} {:?}", - e, event_id, pdu - ); - continue; - } + Err(_) => continue, }; let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { @@ -701,7 +695,7 @@ async fn validate_and_add_event_id( db: &Database, ) -> Result<(EventId, CanonicalJsonObject)> { let mut value = serde_json::from_str::(pdu.json().get()).map_err(|e| { - error!("{:?}: {:?}", pdu, e); + error!("Invalid PDU in server response: {:?}: {:?}", pdu, e); Error::BadServerResponse("Invalid PDU in server response") })?; let event_id = EventId::try_from(&*format!( @@ -745,7 +739,7 @@ async fn validate_and_add_event_id( &value, room_version, ) { - warn!("Event {} failed verification: {}", event_id, e); + warn!("Event {} failed verification {:?} {}", event_id, pdu, e); back_off(event_id); return Err(Error::BadServerResponse("Event failed verification.")); } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 48e6e11..5ba170a 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1309,7 +1309,7 @@ impl Rooms { if !auth_check { return Err(Error::BadRequest( - ErrorKind::InvalidParam, + ErrorKind::Forbidden, "Event is not authorized.", )); } diff --git a/src/server_server.rs b/src/server_server.rs index d51c9eb..699cbbe 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1484,7 +1484,7 @@ pub(crate) async fn fetch_signing_keys( } } - debug!("Loading signing keys for {}", origin); + trace!("Loading signing keys for {}", origin); let mut result = db .globals @@ -1943,7 +1943,7 @@ pub fn create_join_event_template_route<'a>( if !auth_check { return Err(Error::BadRequest( - ErrorKind::InvalidParam, + ErrorKind::Forbidden, "Event is not authorized.", )); } From 3e2f742f30506fcf0bcea82c8aa697995f265cf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Fri, 21 May 2021 22:22:05 +0200 Subject: [PATCH 6/7] fix: room version warnings and other bugs when joining rooms --- src/client_server/membership.rs | 43 +++++---------------------------- src/client_server/sync.rs | 8 ++---- src/database/rooms.rs | 5 +++- 3 files changed, 12 insertions(+), 44 deletions(-) diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 9674b7a..75d7258 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -546,12 +546,6 @@ async fn join_room_by_id_helper( ) .await?; - let count = db.globals.next_count()?; - - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&count.to_be_bytes()); - let pdu = PduEvent::from_id_val(&event_id, join_event.clone()) .map_err(|_| Error::BadServerResponse("Invalid join event PDU."))?; @@ -579,36 +573,6 @@ async fn join_room_by_id_helper( db.rooms.add_pdu_outlier(&event_id, &value)?; if let Some(state_key) = &pdu.state_key { - if pdu.kind == EventType::RoomMember { - let target_user_id = UserId::try_from(state_key.clone()).map_err(|e| { - warn!( - "Invalid user id in send_join response: {}: {}", - state_key, e - ); - Error::BadServerResponse("Invalid user id in send_join response.") - })?; - - let invite_state = Vec::new(); // TODO add a few important events - - // Update our membership info, we do this here incase a user is invited - // and immediately leaves we need the DB to record the invite event for auth - db.rooms.update_membership( - &pdu.room_id, - &target_user_id, - serde_json::from_value::( - pdu.content - .get("membership") - .ok_or(Error::BadServerResponse("Invalid member event content"))? - .clone(), - ) - .map_err(|_| { - Error::BadServerResponse("Invalid membership state content.") - })?, - &pdu.sender, - Some(invite_state), - db, - )?; - } state.insert((pdu.kind.clone(), state_key.clone()), pdu.event_id.clone()); } } @@ -648,10 +612,15 @@ async fn join_room_by_id_helper( // pdu without it's state. This is okay because append_pdu can't fail. let statehashid = db.rooms.append_to_state(&pdu, &db.globals)?; + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_pdu( &pdu, utils::to_canonical_object(&pdu).expect("Pdu is valid canonical object"), - db.globals.next_count()?, + count, pdu_id.into(), &[pdu.event_id.clone()], db, diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 0a27b8d..2b6b39e 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -103,11 +103,6 @@ pub async fn sync_events_route( // The inner Option is None when there is an event, but there is no state hash associated // with it. This can happen for the RoomCreate event, so all updates should arrive. let first_pdu_before_since = db.rooms.pdus_until(sender_user, &room_id, since).next(); - let pdus_after_since = db - .rooms - .pdus_after(sender_user, &room_id, since) - .next() - .is_some(); let since_shortstatehash = first_pdu_before_since.as_ref().map(|pdu| { db.rooms @@ -121,7 +116,7 @@ pub async fn sync_events_route( invited_member_count, joined_since_last_sync, state_events, - ) = if pdus_after_since && Some(current_shortstatehash) != since_shortstatehash { + ) = if Some(current_shortstatehash) != since_shortstatehash { let current_state = db.rooms.room_state_full(&room_id)?; let current_members = current_state .iter() @@ -224,6 +219,7 @@ pub async fn sync_events_route( device_list_updates.insert(user_id); } } + // TODO: Remove, this should never happen here, right? (MembershipState::Join, MembershipState::Leave) => { // Write down users that have left encrypted rooms we are in left_encrypted_users.insert(user_id); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 5ba170a..ede8589 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -374,7 +374,7 @@ impl Rooms { for event_id in new_state.difference(&old_state) { if let Some(pdu) = self.get_pdu_json(event_id)? { - if pdu.get("event_type").and_then(|val| val.as_str()) == Some("m.room.member") { + if pdu.get("type").and_then(|val| val.as_str()) == Some("m.room.member") { if let Ok(pdu) = serde_json::from_value::( serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"), ) { @@ -1158,6 +1158,9 @@ impl Rooms { ) -> Result>> { let mut state = Vec::new(); // Add recommended events + if let Some(e) = self.room_state_get(&invite_event.room_id, &EventType::RoomCreate, "")? { + state.push(e.to_stripped_state_event()); + } if let Some(e) = self.room_state_get(&invite_event.room_id, &EventType::RoomJoinRules, "")? { From 1b42770ab561e7e36cbb819f96d96f0075fc9dfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Sat, 22 May 2021 10:34:19 +0200 Subject: [PATCH 7/7] improvement: warning for small max_request_size values --- src/database.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/database.rs b/src/database.rs index d7126e3..76eae24 100644 --- a/src/database.rs +++ b/src/database.rs @@ -112,7 +112,9 @@ impl Database { .use_compression(true) .open()?; - info!("Opened sled database at {}", config.database_path); + if config.max_request_size < 1024 { + eprintln!("ERROR: Max request size is less than 1KB. Please increase it."); + } let (admin_sender, admin_receiver) = mpsc::unbounded();