From 846a0098c182272a5669bacf0d27fa988eaa4c23 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Tue, 18 Aug 2020 16:26:03 -0400 Subject: [PATCH] Split append_pdu -> append_pdu and build_and_append Move all state event appending to append_state_pdu. --- Cargo.lock | 22 ++-- src/client_server/account.rs | 2 +- src/client_server/membership.rs | 37 ++---- src/client_server/message.rs | 2 +- src/client_server/profile.rs | 4 +- src/client_server/redact.rs | 2 +- src/client_server/room.rs | 20 +-- src/client_server/state.rs | 2 +- src/database/rooms.rs | 217 ++++++++++++++++---------------- src/pdu.rs | 28 ++++- 10 files changed, 175 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index faa9e89..3c5d836 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,9 +116,9 @@ checksum = "4af5687fe33aec5e70ef14caac5e0d363e335e5e5d6385fb75978d0c241b1d67" [[package]] name = "async-trait" -version = "0.1.37" +version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caae68055714ff28740f310927e04f2eba76ff580b16fb18ed90073ee71646f7" +checksum = "6e1a4a2f97ce50c9d0282c1468816208588441492b40d813b2e0419c22c05e7f" dependencies = [ "proc-macro2", "quote", @@ -1194,9 +1194,9 @@ checksum = "1ab52be62400ca80aa00285d25253d7f7c437b7375c4de678f5405d3afe82ca5" [[package]] name = "once_cell" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad" [[package]] name = "opaque-debug" @@ -1860,9 +1860,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.18.0" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac94b333ee2aac3284c5b8a1b7fb4dd11cba88c244e3fe33cdbd047af0eb693" +checksum = "5d1126dcf58e93cee7d098dbda643b5f92ed724f1f6a63007c1116eed6700c81" dependencies = [ "base64 0.12.3", "log", @@ -2088,7 +2088,7 @@ checksum = "7345c971d1ef21ffdbd103a75990a15eb03604fc8b8852ca8cb418ee1a099028" [[package]] name = "state-res" version = "0.1.0" -source = "git+https://github.com/ruma/state-res#789c8140890e076d38b23fa1147c4ff0500c0d38" +source = "git+https://github.com/ruma/state-res#4e9b428c0db50ac3a3421ced12a6fd202a1c36a3" dependencies = [ "itertools", "js_int", @@ -2281,9 +2281,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53953d2d3a5ad81d9f844a32f14ebb121f50b650cd59d0ee2a07cf13c617efed" +checksum = "238ce071d267c5710f9d31451efec16c5ee22de34df17cc05e56cbc92e967117" [[package]] name = "tokio" @@ -2384,9 +2384,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fe233f4227389ab7df5b32649239da7ebe0b281824b4e84b342d04d3fd8c25e" +checksum = "80e0ccfc3378da0cce270c946b676a376943f5cd16aeba64568e7939806f4ada" dependencies = [ "proc-macro2", "quote", diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 9fa1a9c..9e52f6d 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -354,7 +354,7 @@ pub fn deactivate_route( third_party_invite: None, }; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index c04cf7f..824e871 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -1,6 +1,8 @@ use super::State; use crate::{ - client_server, pdu::PduBuilder, server_server, utils, ConduitResult, Database, Error, Ruma, + client_server, + pdu::{PduBuilder, PduEvent}, + server_server, utils, ConduitResult, Database, Error, Ruma, }; use ruma::{ api::{ @@ -142,24 +144,9 @@ pub async fn join_room_by_id_route( Error::Conflict("Found event_id in sorted events that is not in resolved state") })?; - db.rooms.append_pdu( - PduBuilder { - room_id: pdu.room_id().unwrap_or(&body.room_id).clone(), - sender: pdu.sender().clone(), - event_type: pdu.kind(), - content: pdu.content().clone(), - unsigned: Some( - pdu.unsigned() - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(), - ), - state_key: pdu.state_key(), - redacts: pdu.redacts().cloned(), - }, - &db.globals, - &db.account_data, - )?; + // We do not rebuild the PDU in this case only insert to DB + db.rooms + .append_pdu(PduEvent::try_from(pdu)?, &db.globals, &db.account_data)?; } } @@ -171,7 +158,7 @@ pub async fn join_room_by_id_route( third_party_invite: None, }; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), @@ -284,7 +271,7 @@ pub fn leave_room_route( event.membership = member::MembershipState::Leave; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), @@ -312,7 +299,7 @@ pub fn invite_user_route( let sender_id = body.sender_id.as_ref().expect("user is authenticated"); if let invite_user::InvitationRecipient::UserId { user_id } = &body.recipient { - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), @@ -369,7 +356,7 @@ pub fn kick_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; // TODO: reason - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), @@ -424,7 +411,7 @@ pub fn ban_user_route( }, )?; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), @@ -470,7 +457,7 @@ pub fn unban_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 1b461d2..03832d8 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -25,7 +25,7 @@ pub fn send_message_event_route( let mut unsigned = serde_json::Map::new(); unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into()); - let event_id = db.rooms.append_pdu( + let event_id = db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 1313db7..0707b34 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -31,7 +31,7 @@ pub fn set_displayname_route( // Send a new membership event and presence update into all joined rooms for room_id in db.rooms.rooms_joined(&sender_id) { let room_id = room_id?; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -134,7 +134,7 @@ pub fn set_avatar_url_route( // Send a new membership event and presence update into all joined rooms for room_id in db.rooms.rooms_joined(&sender_id) { let room_id = room_id?; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index fc65c23..8708692 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -18,7 +18,7 @@ pub fn redact_event_route( ) -> ConduitResult { let sender_id = body.sender_id.as_ref().expect("user is authenticated"); - let event_id = db.rooms.append_pdu( + let event_id = db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 589a2dc..3ee21b6 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -56,7 +56,7 @@ pub fn create_room_route( content.room_version = RoomVersionId::Version6; // 1. The room create event - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -71,7 +71,7 @@ pub fn create_room_route( )?; // 2. Let the room creator join - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -120,7 +120,7 @@ pub fn create_room_route( }) .expect("event is valid, we just created it") }; - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -144,7 +144,7 @@ pub fn create_room_route( }); // 4.1 Join Rules - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -169,7 +169,7 @@ pub fn create_room_route( )?; // 4.2 History Visibility - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -187,7 +187,7 @@ pub fn create_room_route( )?; // 4.3 Guest Access - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -224,7 +224,7 @@ pub fn create_room_route( continue; } - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -243,7 +243,7 @@ pub fn create_room_route( // 6. Events implied by name and topic if let Some(name) = &body.name { - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -264,7 +264,7 @@ pub fn create_room_route( } if let Some(topic) = &body.topic { - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), @@ -284,7 +284,7 @@ pub fn create_room_route( // 7. Events implied by invite (and TODO: invite_3pid) for user in &body.invite { - db.rooms.append_pdu( + db.rooms.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: sender_id.clone(), diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 14cc497..2920de2 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -57,7 +57,7 @@ pub fn send_state_event_for_key_route( } } - let event_id = db.rooms.append_pdu( + let event_id = db.rooms.build_and_append_pdu( PduBuilder { room_id: body.room_id.clone(), sender: sender_id.clone(), diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 6366c8c..0339b7f 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -4,6 +4,7 @@ pub use edus::RoomEdus; use crate::{pdu::PduBuilder, utils, Error, PduEvent, Result}; use log::error; +// TODO if ruma-signatures re-exports `use ruma::signatures::digest;` use ring::digest; use ruma::{ api::client::error::ErrorKind, @@ -99,7 +100,13 @@ impl Rooms { /// This adds all current state events (not including the incoming event) /// to `stateid_pduid` and adds the incoming event to `pduid_statehash`. /// The incoming event is the `pdu_id` passed to this method. - pub fn append_state_pdu(&self, room_id: &RoomId, pdu_id: &[u8]) -> Result { + pub fn append_state_pdu( + &self, + room_id: &RoomId, + pdu_id: &[u8], + state_key: &str, + kind: &EventType, + ) -> Result { let state_hash = self.new_state_hash_id(room_id)?; let state = self.current_state_pduids(room_id)?; @@ -123,6 +130,13 @@ impl Rooms { // will be everything up to but not including the incoming event. self.pduid_statehash.insert(pdu_id, state_hash.as_bytes())?; + let mut key = room_id.as_bytes().to_vec(); + key.push(0xff); + key.extend_from_slice(kind.to_string().as_bytes()); + key.push(0xff); + key.extend_from_slice(state_key.as_bytes()); + self.roomstateid_pduid.insert(key, pdu_id)?; + Ok(state_hash) } @@ -535,8 +549,92 @@ impl Rooms { } /// Creates a new persisted data unit and adds it to a room. - #[allow(clippy::blocks_in_if_conditions)] pub fn append_pdu( + &self, + pdu: PduEvent, + globals: &super::globals::Globals, + account_data: &super::account_data::AccountData, + ) -> Result { + let mut pdu_json = serde_json::to_value(&pdu).expect("event is valid, we just created it"); + ruma::signatures::hash_and_sign_event( + globals.server_name().as_str(), + globals.keypair(), + &mut pdu_json, + ) + .expect("event is valid, we just created it"); + + self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; + + // Increment the last index and use that + // This is also the next_batch/since value + let index = globals.next_count()?; + + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&index.to_be_bytes()); + + self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?; + + self.eventid_pduid + .insert(pdu.event_id.as_bytes(), &*pdu_id)?; + + if let Some(state_key) = &pdu.state_key { + self.append_state_pdu(&pdu.room_id, &pdu_id, state_key, &pdu.kind)?; + } + + match pdu.kind { + EventType::RoomRedaction => { + if let Some(redact_id) = &pdu.redacts { + // TODO: Reason + let _reason = serde_json::from_value::>( + pdu.content, + ) + .expect("Raw::from_value always works.") + .deserialize() + .map_err(|_| { + Error::BadRequest( + ErrorKind::InvalidParam, + "Invalid redaction event content.", + ) + })? + .reason; + + self.redact_pdu(&redact_id)?; + } + } + EventType::RoomMember => { + if let Some(state_key) = &pdu.state_key { + // if the state_key fails + let target_user_id = UserId::try_from(state_key.as_str()) + .expect("This state_key was previously validated"); + // Update our membership info, we do this here incase a user is invited + // and immediately leaves we need the DB to record the invite event for auth + self.update_membership( + &pdu.room_id, + &target_user_id, + serde_json::from_value::(pdu.content).map_err( + |_| { + Error::BadRequest( + ErrorKind::InvalidParam, + "Invalid redaction event content.", + ) + }, + )?, + &pdu.sender, + account_data, + globals, + )?; + } + } + _ => {} + } + self.edus.room_read_set(&pdu.room_id, &pdu.sender, index)?; + + Ok(pdu.event_id) + } + + /// Creates a new persisted data unit and adds it to a room. + pub fn build_and_append_pdu( &self, pdu_builder: PduBuilder, globals: &super::globals::Globals, @@ -618,6 +716,7 @@ impl Rooms { ); // Is the event allowed? + #[allow(clippy::blocks_in_if_conditions)] if !match event_type { EventType::RoomEncryption => { // Don't allow encryption events when it's disabled @@ -687,15 +786,15 @@ impl Rooms { let mut pdu = PduEvent { event_id: EventId::try_from("$thiswillbefilledinlater").expect("we know this is valid"), - room_id: room_id.clone(), - sender: sender.clone(), + room_id, + sender, origin: globals.server_name().to_owned(), origin_server_ts: utils::millis_since_unix_epoch() .try_into() .expect("time is valid"), - kind: event_type.clone(), - content: content.clone(), - state_key: state_key.clone(), + kind: event_type, + content, + state_key, prev_events, depth: depth .try_into() @@ -704,7 +803,7 @@ impl Rooms { .into_iter() .map(|(_, pdu)| pdu.event_id) .collect(), - redacts: redacts.clone(), + redacts, unsigned, hashes: ruma::events::pdu::EventHash { sha256: "aaa".to_owned(), @@ -722,105 +821,7 @@ impl Rooms { )) .expect("ruma's reference hashes are valid event ids"); - let mut pdu_json = serde_json::to_value(&pdu).expect("event is valid, we just created it"); - ruma::signatures::hash_and_sign_event( - globals.server_name().as_str(), - globals.keypair(), - &mut pdu_json, - ) - .expect("event is valid, we just created it"); - - self.replace_pdu_leaves(&room_id, &pdu.event_id)?; - - // Increment the last index and use that - // This is also the next_batch/since value - let index = globals.next_count()?; - - let mut pdu_id = room_id.to_string().as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&index.to_be_bytes()); - - self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?; - - self.eventid_pduid - .insert(pdu.event_id.to_string(), &*pdu_id)?; - - if let Some(state_key) = &pdu.state_key { - // We call this first because our StateHash relies on the - // state before the new event - self.append_state_pdu(&room_id, &pdu_id)?; - - let mut key = room_id.as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(pdu.kind.to_string().as_bytes()); - key.push(0xff); - key.extend_from_slice(state_key.as_bytes()); - self.roomstateid_pduid.insert(key, pdu_id.as_slice())?; - } - - match event_type { - EventType::RoomRedaction => { - if let Some(redact_id) = &redacts { - // TODO: Reason - let _reason = - serde_json::from_value::>(content) - .expect("Raw::from_value always works.") - .deserialize() - .map_err(|_| { - Error::BadRequest( - ErrorKind::InvalidParam, - "Invalid redaction event content.", - ) - })? - .reason; - - self.redact_pdu(&redact_id)?; - } - } - EventType::RoomMember => { - if let Some(state_key) = state_key { - // if the state_key fails - let target_user_id = UserId::try_from(state_key) - .expect("This state_key was previously validated"); - // Update our membership info, we do this here incase a user is invited - // and immediately leaves we need the DB to record the invite event for auth - self.update_membership( - &room_id, - &target_user_id, - serde_json::from_value::(content).map_err( - |_| { - Error::BadRequest( - ErrorKind::InvalidParam, - "Invalid redaction event content.", - ) - }, - )?, - &sender, - account_data, - globals, - )?; - } - } - EventType::RoomMessage => { - if let Some(body) = content.get("body").and_then(|b| b.as_str()) { - for word in body - .split_terminator(|c: char| !c.is_alphanumeric()) - .map(str::to_lowercase) - { - let mut key = room_id.to_string().as_bytes().to_vec(); - key.push(0xff); - key.extend_from_slice(word.as_bytes()); - key.push(0xff); - key.extend_from_slice(&pdu_id); - self.tokenids.insert(key, &[])?; - } - } - } - _ => {} - } - self.edus.room_read_set(&room_id, &sender, index)?; - - Ok(pdu.event_id) + self.append_pdu(pdu, globals, account_data) } /// Returns an iterator over all PDUs in a room. @@ -999,7 +1000,7 @@ impl Rooms { if is_ignored { member_content.membership = member::MembershipState::Leave; - self.append_pdu( + self.build_and_append_pdu( PduBuilder { room_id: room_id.clone(), sender: user_id.clone(), diff --git a/src/pdu.rs b/src/pdu.rs index 5485f23..eec8e49 100644 --- a/src/pdu.rs +++ b/src/pdu.rs @@ -9,7 +9,7 @@ use ruma::{ }; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::collections::HashMap; +use std::{collections::HashMap, convert::TryFrom}; #[derive(Deserialize, Serialize)] pub struct PduEvent { @@ -177,6 +177,30 @@ impl PduEvent { } } +impl TryFrom<&state_res::StateEvent> for PduEvent { + type Error = Error; + fn try_from(pdu: &state_res::StateEvent) -> Result { + serde_json::from_value(json!({ + "event_id": pdu.event_id(), + "room_id": pdu.room_id(), + "sender": pdu.sender(), + "origin": pdu.origin(), + "origin_server_ts": pdu.origin_server_ts(), + "event_type": pdu.kind(), + "content": pdu.content(), + "state_key": pdu.state_key(), + "prev_events": pdu.prev_event_ids(), + "depth": pdu.depth(), + "auth_events": pdu.auth_events(), + "redacts": pdu.redacts(), + "unsigned": pdu.unsigned(), + "hashes": pdu.hashes(), + "signatures": pdu.signatures(), + })) + .map_err(|_| Error::bad_database("Failed to convert PDU to ruma::Pdu type.")) + } +} + impl PduEvent { pub fn convert_for_state_res(&self) -> Result { serde_json::from_value(json!({ @@ -190,11 +214,13 @@ impl PduEvent { "state_key": self.state_key, "prev_events": self.prev_events .iter() + // TODO How do we create one of these .map(|id| (id, EventHash { sha256: "hello".into() })) .collect::>(), "depth": self.depth, "auth_events": self.auth_events .iter() + // TODO How do we create one of these .map(|id| (id, EventHash { sha256: "hello".into() })) .collect::>(), "redacts": self.redacts,