From 835cf80acd6be7d99b5bdc83c7f891dc167901d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20K=C3=B6sters?= Date: Thu, 11 Feb 2021 13:16:14 +0100 Subject: [PATCH] fix: pushers --- src/database/pusher.rs | 62 ++++++++++++++++++++++++++--------------- src/database/rooms.rs | 6 +++- src/database/sending.rs | 55 +++++++++++------------------------- src/server_server.rs | 3 +- 4 files changed, 63 insertions(+), 63 deletions(-) diff --git a/src/database/pusher.rs b/src/database/pusher.rs index c4f5801..336ef57 100644 --- a/src/database/pusher.rs +++ b/src/database/pusher.rs @@ -35,7 +35,10 @@ impl PushData { } pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> { + println!("CCCCCCCCCCCCCCCCCCCCCc"); + dbg!(&pusher); let mut key = sender.as_bytes().to_vec(); + key.push(0xff); key.extend_from_slice(pusher.pushkey.as_bytes()); // There are 2 kinds of pushers but the spec says: null deletes the pusher. @@ -48,7 +51,7 @@ impl PushData { } self.senderkey_pusher.insert( - key, + dbg!(key), &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"), )?; @@ -56,11 +59,16 @@ impl PushData { } pub fn get_pusher(&self, sender: &UserId) -> Result> { + let mut prefix = sender.as_bytes().to_vec(); + prefix.push(0xff); + self.senderkey_pusher - .scan_prefix(sender.as_bytes()) + .scan_prefix(dbg!(prefix)) .values() - .map(|push: std::result::Result| { - let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?; + .map(|push| { + println!("DDDDDDDDDDDDDDDDDDDDDDDDDD"); + let push = + dbg!(push).map_err(|_| Error::bad_database("Invalid push bytes in db."))?; Ok(serde_json::from_slice(&*push) .map_err(|_| Error::bad_database("Invalid Pusher in db."))?) }) @@ -85,14 +93,17 @@ where Error::BadServerResponse("Invalid destination") })?; - let mut reqwest_request = reqwest::Request::try_from(http_request) + let reqwest_request = reqwest::Request::try_from(http_request) .expect("all http requests are valid reqwest requests"); // TODO: we could keep this very short and let expo backoff do it's thing... - *reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); + //*reqwest_request.timeout_mut() = Some(Duration::from_secs(5)); let url = reqwest_request.url().clone(); - let reqwest_response = globals.reqwest_client().execute(reqwest_request).await; + let reqwest_response = globals + .reqwest_client() + .execute(dbg!(reqwest_request)) + .await; // Because reqwest::Response -> http::Response is complicated: match reqwest_response { @@ -154,6 +165,12 @@ pub async fn send_push_notice( pdu: &PduEvent, db: &Database, ) -> Result<()> { + if let Some(msgtype) = pdu.content.get("msgtype").and_then(|b| b.as_str()) { + if msgtype == "m.notice" { + return Ok(()); + } + } + for rule in ruleset.into_iter() { // TODO: can actions contain contradictory Actions if rule @@ -165,7 +182,7 @@ pub async fn send_push_notice( continue; } - match rule.rule_id.as_str() { + match dbg!(rule.rule_id.as_str()) { ".m.rule.master" => {} ".m.rule.suppress_notices" => { if pdu.kind == EventType::RoomMessage @@ -437,7 +454,8 @@ async fn send_notice( db: &Database, name: &str, ) -> Result<()> { - let (http, _emails): (Vec<&Pusher>, _) = pushers + println!("BBBBBBBBBBBBBBBr"); + let (http, _emails): (Vec<&Pusher>, _) = dbg!(pushers) .iter() .partition(|pusher| pusher.kind == Some(PusherKind::Http)); @@ -445,7 +463,7 @@ async fn send_notice( // Two problems with this // 1. if "event_id_only" is the only format kind it seems we should never add more info // 2. can pusher/devices have conflicting formats - for pusher in http { + for pusher in dbg!(http) { let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly); let url = if let Some(url) = pusher.data.url.as_ref() { url @@ -484,12 +502,12 @@ async fn send_notice( if event_id_only { error!("SEND PUSH NOTICE `{}`", name); - // send_request( - // &db.globals, - // &url, - // send_event_notification::v1::Request::new(notifi), - // ) - // .await?; + send_request( + &db.globals, + &url, + send_event_notification::v1::Request::new(notifi), + ) + .await?; } else { notifi.sender = Some(&event.sender); notifi.event_type = Some(&event.kind); @@ -512,12 +530,12 @@ async fn send_notice( notifi.room_name = room_name.as_deref(); error!("SEND PUSH NOTICE Full `{}`", name); - // send_request( - // &db.globals, - // &url, - // send_event_notification::v1::Request::new(notifi), - // ) - // .await?; + send_request( + &db.globals, + &url, + send_event_notification::v1::Request::new(notifi), + ) + .await?; } } diff --git a/src/database/rooms.rs b/src/database/rooms.rs index ac7d27d..0f02e33 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -1358,6 +1358,7 @@ impl Rooms { self.alias_roomid .insert(alias.alias(), room_id.as_bytes())?; let mut aliasid = room_id.as_bytes().to_vec(); + aliasid.push(0xff); aliasid.extend_from_slice(&globals.next_count()?.to_be_bytes()); self.aliasid_alias.insert(aliasid, &*alias.alias())?; } else { @@ -1370,7 +1371,10 @@ impl Rooms { "Alias does not exist.", ))?; - for key in self.aliasid_alias.scan_prefix(room_id).keys() { + let mut prefix = room_id.to_vec(); + prefix.push(0xff); + + for key in self.aliasid_alias.scan_prefix(prefix).keys() { self.aliasid_alias.remove(key?)?; } } diff --git a/src/database/sending.rs b/src/database/sending.rs index ce81e8c..cbe9ffa 100644 --- a/src/database/sending.rs +++ b/src/database/sending.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, convert::TryFrom, - fmt::{Debug, Display, Formatter}, + fmt::Debug, sync::Arc, time::{Duration, Instant, SystemTime}, }; @@ -25,16 +25,6 @@ pub enum OutgoingKind { Normal(Box), } -impl Display for OutgoingKind { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - OutgoingKind::Appservice(name) => f.write_str(name.as_str()), - OutgoingKind::Normal(name) => f.write_str(name.as_str()), - OutgoingKind::Push(_) => f.write_str("Push notification TODO"), - } - } -} - #[derive(Clone)] pub struct Sending { /// The state for a given state hash. @@ -143,7 +133,7 @@ impl Sending { } } Err((outgoing_kind, e)) => { - info!("Couldn't send transaction to {}\n{}", outgoing_kind, e); + info!("Couldn't send transaction to {:?}\n{}", outgoing_kind, e); let mut prefix = match &outgoing_kind { OutgoingKind::Appservice(serv) => { let mut p = b"+".to_vec(); @@ -278,6 +268,8 @@ impl Sending { key.extend_from_slice(pdu_id); self.servernamepduids.insert(key, b"")?; + println!("AAAA"); + Ok(()) } @@ -306,7 +298,7 @@ impl Sending { pdu_ids: Vec, db: &Database, ) -> std::result::Result { - match kind { + match dbg!(kind) { OutgoingKind::Appservice(server) => { let pdu_jsons = pdu_ids .iter() @@ -364,25 +356,12 @@ impl Sending { .filter_map(|r| r.ok()) .collect::>(); - for pdu in &pdus { + for pdu in dbg!(&pdus) { // Redacted events are not notification targets (we don't send push for them) if pdu.unsigned.get("redacted_because").is_some() { continue; } - // Skip events that came from the admin room - if db - .rooms - .room_aliases(&pdu.room_id) - .any(|alias| match alias { - Ok(a) => a.as_str().starts_with("#admins:"), - _ => false, - }) - || pdu.sender.as_str().starts_with("@conduit:") - { - continue; - } - for user in db.rooms.room_members(&pdu.room_id) { let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; @@ -391,9 +370,7 @@ impl Sending { continue; } - let pushers = db - .pusher - .get_pusher(&user) + let pushers = dbg!(db.pusher.get_pusher(&user)) .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; let rules_for_user = db @@ -426,15 +403,17 @@ impl Sending { uint!(0) }; - crate::database::pusher::send_push_notice( - &user, - unread, - &pushers, - rules_for_user, - pdu, - db, + dbg!( + crate::database::pusher::send_push_notice( + &user, + unread, + &pushers, + rules_for_user, + pdu, + db, + ) + .await ) - .await .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; } } diff --git a/src/server_server.rs b/src/server_server.rs index 03952eb..a8946a9 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -1,6 +1,6 @@ use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma}; use http::header::{HeaderValue, AUTHORIZATION, HOST}; -use log::{info, warn}; +use log::{error, info, warn}; use rocket::{get, post, put, response::content::Json, State}; use ruma::{ api::{ @@ -17,7 +17,6 @@ use ruma::{ OutgoingRequest, }, directory::{IncomingFilter, IncomingRoomNetwork}, - events::EventType, serde::to_canonical_value, signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap}, EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,