From 73124629b7d4cdbddef36c52bfe5e494bad1ac01 Mon Sep 17 00:00:00 2001 From: Devin Ragotzy Date: Fri, 29 Jan 2021 10:14:09 -0500 Subject: [PATCH] Add general rules matching for pusher, calc unread msgs --- Cargo.lock | 16 ++ Cargo.toml | 2 +- src/database/pusher.rs | 412 +++++++++++++++++++++++++++++++++++++--- src/database/sending.rs | 129 +++++++------ src/server_server.rs | 7 +- 5 files changed, 469 insertions(+), 97 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66f624c..859d854 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1534,6 +1534,7 @@ dependencies = [ "ruma-events", "ruma-federation-api", "ruma-identifiers", + "ruma-push-gateway-api", "ruma-serde", "ruma-signatures", ] @@ -1680,6 +1681,21 @@ name = "ruma-identifiers-validation" version = "0.2.0" source = "git+https://github.com/ruma/ruma?rev=bba442580d6cd7ed990b2b63387eed2238cbadc8#bba442580d6cd7ed990b2b63387eed2238cbadc8" +[[package]] +name = "ruma-push-gateway-api" +version = "0.0.1" +source = "git+https://github.com/ruma/ruma?rev=bba442580d6cd7ed990b2b63387eed2238cbadc8#bba442580d6cd7ed990b2b63387eed2238cbadc8" +dependencies = [ + "js_int", + "ruma-api", + "ruma-common", + "ruma-events", + "ruma-identifiers", + "ruma-serde", + "serde", + "serde_json", +] + [[package]] name = "ruma-serde" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 2c6c741..a8760c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "c24f15c18 #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } # Used for matrix spec type definitions and helpers -ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "bba442580d6cd7ed990b2b63387eed2238cbadc8" } +ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "bba442580d6cd7ed990b2b63387eed2238cbadc8" } # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" } # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } diff --git a/src/database/pusher.rs b/src/database/pusher.rs index 041085d..a1a6130 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -1,17 +1,26 @@ -use crate::{Error, PduEvent, Result}; +use crate::{Database, Error, PduEvent, Result}; +use log::{error, info, warn}; use ruma::{ - api::client::r0::push::{Pusher, PusherKind}, - events::{ - room::{ - member::MemberEventContent, - message::{MessageEventContent, TextMessageEventContent}, + api::{ + client::r0::push::{Pusher, PusherKind}, + push_gateway::send_event_notification::{ + self, + v1::{Device, Notification, NotificationCounts, NotificationPriority}, }, - EventType, + OutgoingRequest, }, - push::{PushCondition, Ruleset}, - UserId, + events::room::{ + member::{MemberEventContent, MembershipState}, + message::{MessageEventContent, TextMessageEventContent}, + power_levels::PowerLevelsEventContent, + }, + events::EventType, + push::{Action, PushCondition, PushFormat, Ruleset, Tweak}, + uint, UInt, UserId, }; +use std::{convert::TryFrom, fmt::Debug, time::Duration}; + #[derive(Debug, Clone)] pub struct PushData { /// UserId + pushkey -> Pusher @@ -29,6 +38,15 @@ impl PushData { let mut key = sender.as_bytes().to_vec(); key.extend_from_slice(pusher.pushkey.as_bytes()); + // There are 2 kinds of pushers but the spec says: null deletes the pusher. + if pusher.kind.is_none() { + return self + .senderkey_pusher + .remove(key) + .map(|_| ()) + .map_err(Into::into); + } + self.senderkey_pusher.insert( key, &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"), @@ -46,15 +64,95 @@ impl PushData { Ok(serde_json::from_slice(&*push) .map_err(|_| Error::bad_database("Invalid Pusher in db."))?) }) - .collect::>>() + .collect() + } +} + +pub async fn send_request( + globals: &crate::database::globals::Globals, + destination: &str, + request: T, +) -> Result +where + T: Debug, +{ + let destination = destination.replace("/_matrix/push/v1/notify", ""); + + let http_request = request + .try_into_http_request(&destination, Some("")) + .map_err(|e| { + warn!("Failed to find destination {}: {}", destination, e); + Error::BadServerResponse("Invalid destination") + })?; + + let mut reqwest_request = reqwest::Request::try_from(http_request) + .expect("all http requests are valid reqwest requests"); + + // TODO: we could keep this very short and let expo backoff do it's thing... + *reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); + + let url = reqwest_request.url().clone(); + let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; + + // Because reqwest::Response -> http::Response is complicated: + match reqwest_response { + Ok(mut reqwest_response) => { + let status = reqwest_response.status(); + let mut http_response = http::Response::builder().status(status); + let headers = http_response.headers_mut().unwrap(); + + for (k, v) in reqwest_response.headers_mut().drain() { + if let Some(key) = k { + headers.insert(key, v); + } + } + + let status = reqwest_response.status(); + + let body = reqwest_response + .bytes() + .await + .unwrap_or_else(|e| { + warn!("server error {}", e); + Vec::new().into() + }) // TODO: handle timeout + .into_iter() + .collect::>(); + + if status != 200 { + info!( + "Push gateway returned bad response {} {}\n{}\n{:?}", + destination, + status, + url, + crate::utils::string_from_bytes(&body) + ); + } + + let response = T::IncomingResponse::try_from( + http_response + .body(body) + .expect("reqwest body is valid http body"), + ); + response.map_err(|_| { + info!( + "Push gateway returned invalid response bytes {}\n{}", + destination, url + ); + Error::BadServerResponse("Push gateway returned bad response.") + }) + } + Err(e) => Err(e.into()), } } pub async fn send_push_notice( user: &UserId, - pusher: &Pusher, + unread: UInt, + pushers: &[Pusher], ruleset: Ruleset, pdu: &PduEvent, + db: &Database, ) -> Result<()> { for rule in ruleset.into_iter() { // TODO: can actions contain contradictory Actions @@ -69,8 +167,44 @@ pub async fn send_push_notice( match rule.rule_id.as_str() { ".m.rule.master" => {} - ".m.rule.suppress_notices" => {} - ".m.rule.invite_for_me" => {} + ".m.rule.suppress_notices" => { + if pdu.kind == EventType::RoomMessage + && pdu + .content + .get("msgtype") + .map_or(false, |ty| ty == "m.notice") + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + ".m.rule.invite_for_me" => { + if let EventType::RoomMember = &pdu.kind { + if pdu.state_key.as_deref() == Some(user.as_str()) + && serde_json::from_value::(pdu.content.clone()) + .map_err(|_| Error::bad_database("PDU contained bad message content"))? + .membership + == MembershipState::Invite + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + } ".m.rule.member_event" => { if let EventType::RoomMember = &pdu.kind { // TODO use this? @@ -88,7 +222,17 @@ pub async fn send_push_notice( json.to_string().contains(pattern) } _ => false, - }) {} + }) { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } } } } @@ -103,13 +247,71 @@ pub async fn send_push_notice( &msg_content { if body.contains(user.localpart()) { - send_notice(user, &pusher, &pdu).await?; + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + } + } + ".m.rule.tombstone" => { + if pdu.kind == EventType::RoomTombstone && pdu.state_key.as_deref() == Some("") { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + ".m.rule.roomnotif" => { + if let EventType::RoomMessage = &pdu.kind { + let msg_content = + serde_json::from_value::(pdu.content.clone()) + .map_err(|_| { + Error::bad_database("PDU contained bad message content") + })?; + if let MessageEventContent::Text(TextMessageEventContent { body, .. }) = + &msg_content + { + let power_level_cmp = |pl: PowerLevelsEventContent| { + &pl.notifications.room + <= pl.users.get(&pdu.sender).unwrap_or(&ruma::int!(0)) + }; + let deserialize = |pl: PduEvent| { + serde_json::from_value::(pl.content).ok() + }; + if body.contains("@room") + && db + .rooms + .room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")? + .map(|(_, pl)| pl) + .map(deserialize) + .flatten() + .map_or(false, power_level_cmp) + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; } } } } - ".m.rule.tombstone" => {} - ".m.rule.roomnotif" => {} ".m.rule.contains_user_name" => { if let EventType::RoomMessage = &pdu.kind { let msg_content = @@ -121,28 +323,180 @@ pub async fn send_push_notice( &msg_content { if body.contains(user.localpart()) { - send_notice(user, &pusher, &pdu).await?; + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; } } } } - ".m.rule.call" => {} - ".m.rule.encrypted_room_one_to_one" => {} - ".m.rule.room_one_to_one" => {} - ".m.rule.message" => {} - ".m.rule.encrypted" => {} + ".m.rule.call" => { + if pdu.kind == EventType::CallInvite { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + ".m.rule.encrypted_room_one_to_one" => { + if db.rooms.room_members(&pdu.room_id).count() == 2 + && pdu.kind == EventType::RoomEncrypted + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + ".m.rule.room_one_to_one" => { + if db.rooms.room_members(&pdu.room_id).count() == 2 + && pdu.kind == EventType::RoomMessage + { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + ".m.rule.message" => { + if pdu.kind == EventType::RoomMessage { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } + ".m.rule.encrypted" => { + if pdu.kind == EventType::RoomEncrypted { + let tweaks = rule + .actions + .iter() + .filter_map(|a| match a { + Action::SetTweak(tweak) => Some(tweak.clone()), + _ => None, + }) + .collect::>(); + send_notice(unread, pushers, tweaks, pdu, db).await?; + } + } _ => {} } } Ok(()) } -async fn send_notice(_sender: &UserId, pusher: &Pusher, _event: &PduEvent) -> Result<()> { - if let Some(PusherKind::Http) = pusher.kind { - log::error!("YAHOOO"); - } else { - // EMAIL - todo!("send an email") +async fn send_notice( + unread: UInt, + pushers: &[Pusher], + tweaks: Vec, + event: &PduEvent, + db: &Database, +) -> Result<()> { + let (http, _emails): (Vec<&Pusher>, _) = pushers + .iter() + .partition(|pusher| pusher.kind == Some(PusherKind::Http)); + + // TODO: + // Two problems with this + // 1. if "event_id_only" is the only format kind it seems we should never add more info + // 2. can pusher/devices have conflicting formats + for pusher in http { + let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); + let url = if let Some(url) = pusher.data.url.as_ref() { + url + } else { + error!("Http Pusher must have URL specified."); + continue; + }; + + let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone()); + device.data = Some(pusher.data.clone()); + + // this is not done if "event_id_only" is the format + device.tweaks = tweaks.clone(); + + let d = &[device]; + let mut notifi = Notification::new(d); + + notifi.prio = NotificationPriority::Low; + notifi.event_id = Some(&event.event_id); + notifi.room_id = Some(&event.room_id); + // TODO: missed calls + notifi.counts = NotificationCounts::new(unread, uint!(0)); + + if event.kind == EventType::RoomEncrypted + || tweaks + .iter() + .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) + { + notifi.prio = NotificationPriority::High + } + + if event_id_only { + // send_request( + // &db.globals, + // &url, + // send_event_notification::v1::Request::new(notifi), + // ) + // .await?; + } else { + notifi.sender = Some(&event.sender); + notifi.event_type = Some(&event.kind); + notifi.content = serde_json::value::to_raw_value(&event.content).ok(); + + if event.kind == EventType::RoomMember { + notifi.user_is_target = event.state_key.as_deref() == Some(event.sender.as_str()); + } + + let name = db.users.displayname(&event.sender)?; + notifi.sender_display_name = name.as_deref(); + let room_name = db + .rooms + .room_state_get(&event.room_id, &EventType::RoomName, "")? + .map(|(_, pdu)| match pdu.content.get("name") { + Some(serde_json::Value::String(s)) => Some(s.to_string()), + _ => None, + }) + .flatten(); + notifi.room_name = room_name.as_deref(); + + send_request( + &db.globals, + &url, + send_event_notification::v1::Request::new(notifi), + ) + .await?; + } } + + // TODO: email + // for email in emails {} + Ok(()) } diff --git a/src/database/sending.rs b/src/database/sending.rs index a478501..48e427e 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -13,16 +13,11 @@ use rocket::futures::stream::{FuturesUnordered, StreamExt}; use ruma::{ api::{appservice, federation, OutgoingRequest}, events::{push_rules, EventType}, - ServerName, + uint, ServerName, UInt, }; use sled::IVec; use tokio::{select, sync::Semaphore}; -use super::{ - account_data::AccountData, appservice::Appservice, globals::Globals, pusher::PushData, - rooms::Rooms, -}; - #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum OutgoingKind { Appservice(Box), @@ -52,11 +47,7 @@ impl Sending { pub fn start_handler(&self, db: &Database) { let servernamepduids = self.servernamepduids.clone(); let servercurrentpdus = self.servercurrentpdus.clone(); - let rooms = db.rooms.clone(); - let globals = db.globals.clone(); - let appservice = db.appservice.clone(); - let pusher = db.pusher.clone(); - let account_data = db.account_data.clone(); + let db = db.clone(); tokio::spawn(async move { let mut futures = FuturesUnordered::new(); @@ -79,15 +70,7 @@ impl Sending { } for (outgoing_kind, pdus) in current_transactions { - futures.push(Self::handle_event( - outgoing_kind, - pdus, - &rooms, - &globals, - &appservice, - &pusher, - &account_data, - )); + futures.push(Self::handle_event(outgoing_kind, pdus, &db)); } let mut last_failed_try: HashMap = HashMap::new(); @@ -151,11 +134,7 @@ impl Sending { Self::handle_event( outgoing_kind.clone(), new_pdus, - &rooms, - &globals, - &appservice, - &pusher, - &account_data + &db, ) ); } else { @@ -275,11 +254,7 @@ impl Sending { Self::handle_event( outgoing_kind, vec![pdu_id.into()], - &rooms, - &globals, - &appservice, - &pusher, - &account_data + &db, ) ); } @@ -325,14 +300,11 @@ impl Sending { Ok(()) } + // TODO this is the whole DB but is it better to clone smaller parts than the whole thing?? async fn handle_event( kind: OutgoingKind, pdu_ids: Vec, - rooms: &Rooms, - globals: &Globals, - appservice: &Appservice, - pusher: &PushData, - account_data: &AccountData, + db: &Database, ) -> std::result::Result { match kind { OutgoingKind::Appservice(server) => { @@ -340,7 +312,7 @@ impl Sending { .iter() .map(|pdu_id| { Ok::<_, (Box, Error)>( - rooms + db.rooms .get_pdu_from_id(pdu_id) .map_err(|e| (server.clone(), e))? .ok_or_else(|| { @@ -357,8 +329,8 @@ impl Sending { .filter_map(|r| r.ok()) .collect::>(); appservice_server::send_request( - &globals, - appservice + &db.globals, + db.appservice .get_registration(server.as_str()) .unwrap() .unwrap(), // TODO: handle error @@ -376,7 +348,7 @@ impl Sending { .iter() .map(|pdu_id| { Ok::<_, (Vec, Error)>( - rooms + db.rooms .get_pdu_from_id(pdu_id) .map_err(|e| (id.clone(), e))? .ok_or_else(|| { @@ -391,36 +363,67 @@ impl Sending { }) .filter_map(|r| r.ok()) .collect::>(); + dbg!(&pdus); + for pdu in &pdus { - for user in rooms.room_members(&pdu.room_id) { + // Redacted events are not notification targets (we don't send push for them) + if pdu.unsigned.get("redacted_because").is_some() { + continue; + } + for user in db.rooms.room_members(&pdu.room_id) { dbg!(&user); + let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; - for pusher in pusher + let pushers = db + .pusher .get_pusher(&user) + .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; + + let rules_for_user = db + .account_data + .get::(None, &user, EventType::PushRules) + .map_err(|e| (OutgoingKind::Push(id.clone()), e))? + .map(|ev| ev.content.global) + .unwrap_or_else(|| crate::push_rules::default_pushrules(&user)); + + let unread: UInt = if let Some(last_read) = db + .rooms + .edus + .private_read_get(&pdu.room_id, &user) .map_err(|e| (OutgoingKind::Push(id.clone()), e))? { - let rules_for_user = account_data - .get::( - None, - &user, - EventType::PushRules, - ) + (db.rooms + .pdus_since(&user, &pdu.room_id, last_read) .map_err(|e| (OutgoingKind::Push(id.clone()), e))? - .map(|ev| ev.content.global) - .unwrap_or_else(|| crate::push_rules::default_pushrules(&user)); - dbg!(&pusher); - dbg!(&rules_for_user); + .filter_map(|pdu| pdu.ok()) // Filter out buggy events + .filter(|(_, pdu)| { + matches!( + pdu.kind.clone(), + EventType::RoomMessage | EventType::RoomEncrypted + ) + }) + .count() as u32) + .into() + } else { + // Just return zero unread messages + uint!(0) + }; - crate::database::pusher::send_push_notice( - &user, - &pusher, - rules_for_user, - pdu, - ) - .await - .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; - } + dbg!(&pushers); + + // dbg!(&rules_for_user); + + crate::database::pusher::send_push_notice( + &user, + unread, + &pushers, + rules_for_user, + pdu, + db, + ) + .await + .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; } } @@ -434,7 +437,7 @@ impl Sending { // TODO: check room version and remove event_id if needed serde_json::from_str( PduEvent::convert_to_outgoing_federation_event( - rooms + db.rooms .get_pdu_json_from_id(pdu_id) .map_err(|e| (OutgoingKind::Normal(server.clone()), e))? .ok_or_else(|| { @@ -456,10 +459,10 @@ impl Sending { .collect::>(); server_server::send_request( - &globals, + &db.globals, &*server, send_transaction_message::v1::Request { - origin: globals.server_name(), + origin: db.globals.server_name(), pdus: &pdu_jsons, edus: &[], origin_server_ts: SystemTime::now(), diff --git a/src/server_server.rs b/src/server_server.rs index ad0a1a4..03952eb 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,7 +1,6 @@ use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; -use get_profile_information::v1::ProfileField; use http::header::{HeaderValue, AUTHORIZATION, HOST}; -use log::{error, info, warn}; +use log::{info, warn}; use rocket::{get, post, put, response::content::Json, State}; use ruma::{ api::{ @@ -12,7 +11,7 @@ use ruma::{ VerifyKey, }, event::{get_event, get_missing_events, get_room_state_ids}, - query::get_profile_information, + query::get_profile_information::{self, v1::ProfileField}, transactions::send_transaction_message, }, OutgoingRequest, @@ -222,7 +221,7 @@ fn add_port_to_hostname(destination_str: String) -> String { /// Returns: actual_destination, host header /// Implemented according to the specification at https://matrix.org/docs/spec/server_server/r0.1.4#resolving-server-names /// Numbers in comments below refer to bullet points in linked section of specification -async fn find_actual_destination( +pub(crate) async fn find_actual_destination( globals: &crate::database::globals::Globals, destination: &ServerName, ) -> (String, Option) {