WIP: send out push notification, impl pusher routes
It seems in order to test this I may also have to impl the email 3pid route? I need to call the set_pusher route somehow.
This commit is contained in:
		
							parent
							
								
									a0ecd76e21
								
							
						
					
					
						commit
						2d69e81699
					
				
					 6 changed files with 514 additions and 166 deletions
				
			
		|  | @ -659,3 +659,17 @@ pub async fn deactivate_route( | |||
|     } | ||||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| /*/ | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     get("/_matrix/client/r0/account/3pid", data = "<body>") | ||||
| )] | ||||
| pub async fn third_party_route( | ||||
|     body: Ruma<account::add_3pid::Request<'_>>, | ||||
| ) -> ConduitResult<account::add_3pid::Response> { | ||||
|     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
| 
 | ||||
|     Ok(account::add_3pid::Response::default().into()) | ||||
| } | ||||
| */ | ||||
|  |  | |||
|  | @ -666,20 +666,36 @@ pub async fn delete_pushrule_route( | |||
|     Ok(delete_pushrule::Response.into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/pushers"))] | ||||
| pub async fn get_pushers_route() -> ConduitResult<get_pushers::Response> { | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     get("/_matrix/client/r0/pushers", data = "<body>") | ||||
| )] | ||||
| pub async fn get_pushers_route( | ||||
|     db: State<'_, Database>, | ||||
|     body: Ruma<get_pushers::Request>, | ||||
| ) -> ConduitResult<get_pushers::Response> { | ||||
|     let sender = body.sender_user.as_ref().expect("authenticated endpoint"); | ||||
| 
 | ||||
|     Ok(get_pushers::Response { | ||||
|         pushers: Vec::new(), | ||||
|         pushers: db.pusher.get_pusher(sender)?, | ||||
|     } | ||||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(feature = "conduit_bin", post("/_matrix/client/r0/pushers/set"))] | ||||
| pub async fn set_pushers_route(db: State<'_, Database>) -> ConduitResult<get_pushers::Response> { | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     post("/_matrix/client/r0/pushers/set", data = "<body>") | ||||
| )] | ||||
| pub async fn set_pushers_route( | ||||
|     db: State<'_, Database>, | ||||
|     body: Ruma<set_pusher::Request>, | ||||
| ) -> ConduitResult<set_pusher::Response> { | ||||
|     let sender = body.sender_user.as_ref().expect("authenticated endpoint"); | ||||
|     let pusher = body.pusher.clone(); | ||||
| 
 | ||||
|     db.pusher.set_pusher(sender, pusher)?; | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     Ok(get_pushers::Response { | ||||
|         pushers: Vec::new(), | ||||
|     } | ||||
|     .into()) | ||||
|     Ok(set_pusher::Response::default().into()) | ||||
| } | ||||
|  |  | |||
|  | @ -4,6 +4,7 @@ pub mod appservice; | |||
| pub mod globals; | ||||
| pub mod key_backups; | ||||
| pub mod media; | ||||
| pub mod pusher; | ||||
| pub mod rooms; | ||||
| pub mod sending; | ||||
| pub mod transaction_ids; | ||||
|  | @ -17,9 +18,11 @@ use log::info; | |||
| use rocket::futures::{self, channel::mpsc}; | ||||
| use ruma::{DeviceId, ServerName, UserId}; | ||||
| use serde::Deserialize; | ||||
| use std::collections::HashMap; | ||||
| use std::fs::remove_dir_all; | ||||
| use std::sync::{Arc, RwLock}; | ||||
| use std::{ | ||||
|     collections::HashMap, | ||||
|     fs::remove_dir_all, | ||||
|     sync::{Arc, RwLock}, | ||||
| }; | ||||
| use tokio::sync::Semaphore; | ||||
| 
 | ||||
| #[derive(Clone, Debug, Deserialize)] | ||||
|  | @ -73,6 +76,7 @@ pub struct Database { | |||
|     pub sending: sending::Sending, | ||||
|     pub admin: admin::Admin, | ||||
|     pub appservice: appservice::Appservice, | ||||
|     pub pusher: pusher::PushData, | ||||
|     pub _db: sled::Db, | ||||
| } | ||||
| 
 | ||||
|  | @ -187,6 +191,7 @@ impl Database { | |||
|                 cached_registrations: Arc::new(RwLock::new(HashMap::new())), | ||||
|                 id_appserviceregistrations: db.open_tree("id_appserviceregistrations")?, | ||||
|             }, | ||||
|             pusher: pusher::PushData::new(&db)?, | ||||
|             _db: db, | ||||
|         }; | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										148
									
								
								src/database/pusher.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										148
									
								
								src/database/pusher.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,148 @@ | |||
| use crate::{Error, PduEvent, Result}; | ||||
| use ruma::{ | ||||
|     api::client::r0::push::{Pusher, PusherKind}, | ||||
|     events::{ | ||||
|         room::{ | ||||
|             member::MemberEventContent, | ||||
|             message::{MessageEventContent, TextMessageEventContent}, | ||||
|         }, | ||||
|         EventType, | ||||
|     }, | ||||
|     push::{PushCondition, Ruleset}, | ||||
|     UserId, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct PushData { | ||||
|     /// UserId + pushkey -> Pusher
 | ||||
|     pub(super) senderkey_pusher: sled::Tree, | ||||
| } | ||||
| 
 | ||||
| impl PushData { | ||||
|     pub fn new(db: &sled::Db) -> Result<Self> { | ||||
|         Ok(Self { | ||||
|             senderkey_pusher: db.open_tree("senderkey_pusher")?, | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     pub fn set_pusher(&self, sender: &UserId, pusher: Pusher) -> Result<()> { | ||||
|         let mut key = sender.as_bytes().to_vec(); | ||||
|         key.extend_from_slice(pusher.pushkey.as_bytes()); | ||||
| 
 | ||||
|         self.senderkey_pusher.insert( | ||||
|             key, | ||||
|             &*serde_json::to_string(&pusher).expect("Pusher is valid JSON string"), | ||||
|         )?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_pusher(&self, sender: &UserId) -> Result<Vec<Pusher>> { | ||||
|         self.senderkey_pusher | ||||
|             .scan_prefix(sender.as_bytes()) | ||||
|             .values() | ||||
|             .map(|push: std::result::Result<sled::IVec, _>| { | ||||
|                 let push = push.map_err(|_| Error::bad_database("Invalid push bytes in db."))?; | ||||
|                 Ok(serde_json::from_slice(&*push) | ||||
|                     .map_err(|_| Error::bad_database("Invalid Pusher in db."))?) | ||||
|             }) | ||||
|             .collect::<Result<Vec<_>>>() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| pub async fn send_push_notice( | ||||
|     user: &UserId, | ||||
|     pusher: &Pusher, | ||||
|     ruleset: Ruleset, | ||||
|     pdu: &PduEvent, | ||||
| ) -> Result<()> { | ||||
|     for rule in ruleset.into_iter() { | ||||
|         // TODO: can actions contain contradictory Actions
 | ||||
|         if rule | ||||
|             .actions | ||||
|             .iter() | ||||
|             .any(|act| matches!(act, ruma::push::Action::DontNotify)) | ||||
|             || !rule.enabled | ||||
|         { | ||||
|             continue; | ||||
|         } | ||||
| 
 | ||||
|         match rule.rule_id.as_str() { | ||||
|             ".m.rule.master" => {} | ||||
|             ".m.rule.suppress_notices" => {} | ||||
|             ".m.rule.invite_for_me" => {} | ||||
|             ".m.rule.member_event" => { | ||||
|                 if let EventType::RoomMember = &pdu.kind { | ||||
|                     // TODO use this?
 | ||||
|                     let _member = serde_json::from_value::<MemberEventContent>(pdu.content.clone()) | ||||
|                         .map_err(|_| Error::bad_database("PDU contained bad message content"))?; | ||||
|                     if let Some(conditions) = rule.conditions { | ||||
|                         if conditions.iter().any(|cond| match cond { | ||||
|                             PushCondition::EventMatch { key, pattern } => { | ||||
|                                 let mut json = | ||||
|                                     serde_json::to_value(pdu).expect("PDU is valid JSON"); | ||||
|                                 for key in key.split('.') { | ||||
|                                     json = json[key].clone(); | ||||
|                                 } | ||||
|                                 // TODO: this is baddddd
 | ||||
|                                 json.to_string().contains(pattern) | ||||
|                             } | ||||
|                             _ => false, | ||||
|                         }) {} | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             ".m.rule.contains_display_name" => { | ||||
|                 if let EventType::RoomMessage = &pdu.kind { | ||||
|                     let msg_content = | ||||
|                         serde_json::from_value::<MessageEventContent>(pdu.content.clone()) | ||||
|                             .map_err(|_| { | ||||
|                                 Error::bad_database("PDU contained bad message content") | ||||
|                             })?; | ||||
|                     if let MessageEventContent::Text(TextMessageEventContent { body, .. }) = | ||||
|                         &msg_content | ||||
|                     { | ||||
|                         if body.contains(user.localpart()) { | ||||
|                             send_notice(user, &pusher, &pdu).await?; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             ".m.rule.tombstone" => {} | ||||
|             ".m.rule.roomnotif" => {} | ||||
|             ".m.rule.contains_user_name" => { | ||||
|                 if let EventType::RoomMessage = &pdu.kind { | ||||
|                     let msg_content = | ||||
|                         serde_json::from_value::<MessageEventContent>(pdu.content.clone()) | ||||
|                             .map_err(|_| { | ||||
|                                 Error::bad_database("PDU contained bad message content") | ||||
|                             })?; | ||||
|                     if let MessageEventContent::Text(TextMessageEventContent { body, .. }) = | ||||
|                         &msg_content | ||||
|                     { | ||||
|                         if body.contains(user.localpart()) { | ||||
|                             send_notice(user, &pusher, &pdu).await?; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             ".m.rule.call" => {} | ||||
|             ".m.rule.encrypted_room_one_to_one" => {} | ||||
|             ".m.rule.room_one_to_one" => {} | ||||
|             ".m.rule.message" => {} | ||||
|             ".m.rule.encrypted" => {} | ||||
|             _ => {} | ||||
|         } | ||||
|     } | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| async fn send_notice(_sender: &UserId, pusher: &Pusher, _event: &PduEvent) -> Result<()> { | ||||
|     if let Some(PusherKind::Http) = pusher.kind { | ||||
|         log::error!("YAHOOO"); | ||||
|     } else { | ||||
|         // EMAIL
 | ||||
|         todo!("send an email") | ||||
|     } | ||||
|     Ok(()) | ||||
| } | ||||
|  | @ -531,6 +531,9 @@ impl Rooms { | |||
|         self.eventid_pduid | ||||
|             .insert(pdu.event_id.as_bytes(), &*pdu_id)?; | ||||
| 
 | ||||
|         // See if the event matches any known pushers
 | ||||
|         db.sending.send_push_pdu(&*pdu_id)?; | ||||
| 
 | ||||
|         match pdu.kind { | ||||
|             EventType::RoomRedaction => { | ||||
|                 if let Some(redact_id) = &pdu.redacts { | ||||
|  |  | |||
|  | @ -1,43 +1,62 @@ | |||
| use std::{ | ||||
|     collections::HashMap, | ||||
|     convert::TryFrom, | ||||
|     fmt::Debug, | ||||
|     fmt::{Debug, Display, Formatter}, | ||||
|     sync::Arc, | ||||
|     time::{Duration, Instant, SystemTime}, | ||||
| }; | ||||
| 
 | ||||
| use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; | ||||
| use crate::{appservice_server, server_server, utils, Database, Error, PduEvent, Result}; | ||||
| use federation::transactions::send_transaction_message; | ||||
| use log::info; | ||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||
| use ruma::{ | ||||
|     api::{appservice, federation, OutgoingRequest}, | ||||
|     events::{push_rules, EventType}, | ||||
|     ServerName, | ||||
| }; | ||||
| use sled::IVec; | ||||
| use tokio::select; | ||||
| use tokio::sync::Semaphore; | ||||
| use tokio::{select, sync::Semaphore}; | ||||
| 
 | ||||
| use super::{ | ||||
|     account_data::AccountData, appservice::Appservice, globals::Globals, pusher::PushData, | ||||
|     rooms::Rooms, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Clone, Debug, PartialEq, Eq, Hash)] | ||||
| pub enum OutgoingKind { | ||||
|     Appservice(Box<ServerName>), | ||||
|     Push(Vec<u8>), | ||||
|     Normal(Box<ServerName>), | ||||
| } | ||||
| 
 | ||||
| impl Display for OutgoingKind { | ||||
|     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||||
|         match self { | ||||
|             OutgoingKind::Appservice(name) => f.write_str(name.as_str()), | ||||
|             OutgoingKind::Normal(name) => f.write_str(name.as_str()), | ||||
|             OutgoingKind::Push(_) => f.write_str("Push notification TODO"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Sending { | ||||
|     /// The state for a given state hash.
 | ||||
|     pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+)ServerName + PduId
 | ||||
|     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+)ServerName + PduId (pduid can be empty for reservation)
 | ||||
|     pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)ServerName / UserId + PduId
 | ||||
|     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation)
 | ||||
|     pub(super) maximum_requests: Arc<Semaphore>, | ||||
| } | ||||
| 
 | ||||
| impl Sending { | ||||
|     pub fn start_handler( | ||||
|         &self, | ||||
|         globals: &super::globals::Globals, | ||||
|         rooms: &super::rooms::Rooms, | ||||
|         appservice: &super::appservice::Appservice, | ||||
|     ) { | ||||
|     pub fn start_handler(&self, db: &Database) { | ||||
|         let servernamepduids = self.servernamepduids.clone(); | ||||
|         let servercurrentpdus = self.servercurrentpdus.clone(); | ||||
|         let rooms = rooms.clone(); | ||||
|         let globals = globals.clone(); | ||||
|         let appservice = appservice.clone(); | ||||
|         let rooms = db.rooms.clone(); | ||||
|         let globals = db.globals.clone(); | ||||
|         let appservice = db.appservice.clone(); | ||||
|         let pusher = db.pusher.clone(); | ||||
|         let account_data = db.account_data.clone(); | ||||
| 
 | ||||
|         tokio::spawn(async move { | ||||
|             let mut futures = FuturesUnordered::new(); | ||||
|  | @ -45,45 +64,57 @@ impl Sending { | |||
|             // Retry requests we could not finish yet
 | ||||
|             let mut current_transactions = HashMap::new(); | ||||
| 
 | ||||
|             for (server, pdu, is_appservice) in servercurrentpdus | ||||
|             for (outgoing_kind, pdu) in servercurrentpdus | ||||
|                 .iter() | ||||
|                 .filter_map(|r| r.ok()) | ||||
|                 .filter_map(|(key, _)| Self::parse_servercurrentpdus(key).ok()) | ||||
|                 .filter(|(_, pdu, _)| !pdu.is_empty()) // Skip reservation key
 | ||||
|                 .filter(|(_, pdu)| !pdu.is_empty()) // Skip reservation key
 | ||||
|                 .take(50) | ||||
|             // This should not contain more than 50 anyway
 | ||||
|             { | ||||
|                 current_transactions | ||||
|                     .entry((server, is_appservice)) | ||||
|                     .entry(outgoing_kind) | ||||
|                     .or_insert_with(Vec::new) | ||||
|                     .push(pdu); | ||||
|             } | ||||
| 
 | ||||
|             for ((server, is_appservice), pdus) in current_transactions { | ||||
|             for (outgoing_kind, pdus) in current_transactions { | ||||
|                 futures.push(Self::handle_event( | ||||
|                     server, | ||||
|                     is_appservice, | ||||
|                     outgoing_kind, | ||||
|                     pdus, | ||||
|                     &globals, | ||||
|                     &rooms, | ||||
|                     &globals, | ||||
|                     &appservice, | ||||
|                     &pusher, | ||||
|                     &account_data, | ||||
|                 )); | ||||
|             } | ||||
| 
 | ||||
|             let mut last_failed_try: HashMap<Box<ServerName>, (u32, Instant)> = HashMap::new(); | ||||
|             let mut last_failed_try: HashMap<OutgoingKind, (u32, Instant)> = HashMap::new(); | ||||
| 
 | ||||
|             let mut subscriber = servernamepduids.watch_prefix(b""); | ||||
|             loop { | ||||
|                 select! { | ||||
|                     Some(response) = futures.next() => { | ||||
|                         match response { | ||||
|                             Ok((server, is_appservice)) => { | ||||
|                                 let mut prefix = if is_appservice { | ||||
|                                     b"+".to_vec() | ||||
|                                 } else { | ||||
|                                     Vec::new() | ||||
|                             Ok(outgoing_kind) => { | ||||
|                                 let mut prefix = match &outgoing_kind { | ||||
|                                     OutgoingKind::Appservice(server) => { | ||||
|                                         let mut p = b"+".to_vec(); | ||||
|                                         p.extend_from_slice(server.as_bytes()); | ||||
|                                         p | ||||
|                                     } | ||||
|                                     OutgoingKind::Push(id) => { | ||||
|                                         let mut p = b"$".to_vec(); | ||||
|                                         p.extend_from_slice(&id); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                     OutgoingKind::Normal(server) => { | ||||
|                                         let mut p = vec![]; | ||||
|                                         p.extend_from_slice(server.as_bytes()); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                 }; | ||||
|                                 prefix.extend_from_slice(server.as_bytes()); | ||||
|                                 prefix.push(0xff); | ||||
| 
 | ||||
|                                 for key in servercurrentpdus | ||||
|  | @ -116,22 +147,45 @@ impl Sending { | |||
|                                         servernamepduids.remove(¤t_key).unwrap(); | ||||
|                                     } | ||||
| 
 | ||||
|                                     futures.push(Self::handle_event(server, is_appservice, new_pdus, &globals, &rooms, &appservice)); | ||||
|                                     futures.push( | ||||
|                                         Self::handle_event( | ||||
|                                             outgoing_kind.clone(), | ||||
|                                             new_pdus, | ||||
|                                             &rooms, | ||||
|                                             &globals, | ||||
|                                             &appservice, | ||||
|                                             &pusher, | ||||
|                                             &account_data | ||||
|                                         ) | ||||
|                                     ); | ||||
|                                 } else { | ||||
|                                     servercurrentpdus.remove(&prefix).unwrap(); | ||||
|                                     // servercurrentpdus with the prefix should be empty now
 | ||||
|                                 } | ||||
|                             } | ||||
|                             Err((server, is_appservice, e)) => { | ||||
|                                 info!("Couldn't send transaction to {}\n{}", server, e); | ||||
|                                 let mut prefix = if is_appservice { | ||||
|                                     b"+".to_vec() | ||||
|                                 } else { | ||||
|                                     Vec::new() | ||||
|                             Err((outgoing_kind, e)) => { | ||||
|                                 info!("Couldn't send transaction to {}\n{}", outgoing_kind, e); | ||||
|                                 let mut prefix = match &outgoing_kind { | ||||
|                                     OutgoingKind::Appservice(serv) => { | ||||
|                                         let mut p = b"+".to_vec(); | ||||
|                                         p.extend_from_slice(serv.as_bytes()); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                     OutgoingKind::Push(id) => { | ||||
|                                         let mut p = b"$".to_vec(); | ||||
|                                         p.extend_from_slice(&id); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                     OutgoingKind::Normal(serv) => { | ||||
|                                         let mut p = vec![]; | ||||
|                                         p.extend_from_slice(serv.as_bytes()); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                 }; | ||||
|                                 prefix.extend_from_slice(server.as_bytes()); | ||||
| 
 | ||||
|                                 prefix.push(0xff); | ||||
|                                 last_failed_try.insert(server.clone(), match last_failed_try.get(&server) { | ||||
| 
 | ||||
|                                 last_failed_try.insert(outgoing_kind.clone(), match last_failed_try.get(&outgoing_kind) { | ||||
|                                     Some(last_failed) => { | ||||
|                                         (last_failed.0+1, Instant::now()) | ||||
|                                     }, | ||||
|  | @ -157,40 +211,56 @@ impl Sending { | |||
| 
 | ||||
|                                 instant.elapsed() < min_elapsed_duration | ||||
|                             }; | ||||
|                             if let Some((server, is_appservice, pdu_id)) = utils::string_from_bytes( | ||||
|                             if let Some((outgoing_kind, pdu_id)) = utils::string_from_bytes( | ||||
|                                     parts | ||||
|                                         .next() | ||||
|                                         .expect("splitn will always return 1 or more elements"), | ||||
|                                 ) | ||||
|                                 .map_err(|_| Error::bad_database("ServerName in servernamepduid bytes are invalid.")) | ||||
|                                 .map(|server_str| { | ||||
|                                 .map_err(|_| Error::bad_database("[Utf8] ServerName in servernamepduid bytes are invalid.")) | ||||
|                                 .and_then(|ident_str| { | ||||
|                                     // Appservices start with a plus
 | ||||
|                                     if server_str.starts_with('+') { | ||||
|                                         (server_str[1..].to_owned(), true) | ||||
|                                     Ok(if ident_str.starts_with('+') { | ||||
|                                         OutgoingKind::Appservice( | ||||
|                                             Box::<ServerName>::try_from(&ident_str[1..]) | ||||
|                                                 .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))? | ||||
|                                         ) | ||||
|                                     } else if ident_str.starts_with('$') { | ||||
|                                         OutgoingKind::Push(ident_str[1..].as_bytes().to_vec()) | ||||
|                                     } else { | ||||
|                                         (server_str, false) | ||||
|                                     } | ||||
|                                         OutgoingKind::Normal( | ||||
|                                             Box::<ServerName>::try_from(ident_str) | ||||
|                                                 .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))? | ||||
|                                         ) | ||||
|                                     }) | ||||
|                                 .and_then(|(server_str, is_appservice)| Box::<ServerName>::try_from(server_str) | ||||
|                                     .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid.")).map(|s| (s, is_appservice))) | ||||
|                                 .ok() | ||||
|                                 .and_then(|(server, is_appservice)| parts | ||||
|                                 }) | ||||
|                                 .and_then(|outgoing_kind| parts | ||||
|                                     .next() | ||||
|                                     .ok_or_else(|| Error::bad_database("Invalid servernamepduid in db.")) | ||||
|                                     .ok() | ||||
|                                     .map(|pdu_id| (server, is_appservice, pdu_id)) | ||||
|                                     .map(|pdu_id| (outgoing_kind, pdu_id)) | ||||
|                                 ) | ||||
|                                 .filter(|(server, is_appservice, _)| { | ||||
|                                     if last_failed_try.get(server).map_or(false, exponential_backoff) { | ||||
|                                 .ok() | ||||
|                                 .filter(|(outgoing_kind, _)| { | ||||
|                                     if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) { | ||||
|                                         return false; | ||||
|                                     } | ||||
| 
 | ||||
|                                     let mut prefix = if *is_appservice { | ||||
|                                         b"+".to_vec() | ||||
|                                     } else { | ||||
|                                         Vec::new() | ||||
|                                     let mut prefix = match outgoing_kind { | ||||
|                                         OutgoingKind::Appservice(serv) => { | ||||
|                                             let mut p = b"+".to_vec(); | ||||
|                                             p.extend_from_slice(serv.as_bytes()); | ||||
|                                             p | ||||
|                                     }, | ||||
|                                         OutgoingKind::Push(id) => { | ||||
|                                             let mut p = b"$".to_vec(); | ||||
|                                             p.extend_from_slice(&id); | ||||
|                                             p | ||||
|                                         }, | ||||
|                                         OutgoingKind::Normal(serv) => { | ||||
|                                             let mut p = vec![]; | ||||
|                                             p.extend_from_slice(serv.as_bytes()); | ||||
|                                             p | ||||
|                                         }, | ||||
|                                     }; | ||||
|                                     prefix.extend_from_slice(server.as_bytes()); | ||||
|                                     prefix.push(0xff); | ||||
| 
 | ||||
|                                     servercurrentpdus | ||||
|  | @ -201,7 +271,17 @@ impl Sending { | |||
|                                 servercurrentpdus.insert(&key, &[]).unwrap(); | ||||
|                                 servernamepduids.remove(&key).unwrap(); | ||||
| 
 | ||||
|                                 futures.push(Self::handle_event(server, is_appservice, vec![pdu_id.into()], &globals, &rooms, &appservice)); | ||||
|                                 futures.push( | ||||
|                                     Self::handle_event( | ||||
|                                         outgoing_kind, | ||||
|                                         vec![pdu_id.into()], | ||||
|                                         &rooms, | ||||
|                                         &globals, | ||||
|                                         &appservice, | ||||
|                                         &pusher, | ||||
|                                         &account_data | ||||
|                                     ) | ||||
|                                 ); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|  | @ -210,6 +290,22 @@ impl Sending { | |||
|         }); | ||||
|     } | ||||
| 
 | ||||
|     pub fn send_push_pdu(&self, pdu_id: &[u8]) -> Result<()> { | ||||
|         // Make sure we don't cause utf8 errors when parsing to a String...
 | ||||
|         let pduid = String::from_utf8_lossy(pdu_id).as_bytes().to_vec(); | ||||
| 
 | ||||
|         // these are valid ServerName chars
 | ||||
|         // (byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'.')
 | ||||
|         let mut key = b"$".to_vec(); | ||||
|         // keep each pdu push unique
 | ||||
|         key.extend_from_slice(pduid.as_slice()); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(pdu_id); | ||||
|         self.servernamepduids.insert(key, b"")?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> { | ||||
|         let mut key = server.as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|  | @ -230,14 +326,16 @@ impl Sending { | |||
|     } | ||||
| 
 | ||||
|     async fn handle_event( | ||||
|         server: Box<ServerName>, | ||||
|         is_appservice: bool, | ||||
|         kind: OutgoingKind, | ||||
|         pdu_ids: Vec<IVec>, | ||||
|         globals: &super::globals::Globals, | ||||
|         rooms: &super::rooms::Rooms, | ||||
|         appservice: &super::appservice::Appservice, | ||||
|     ) -> std::result::Result<(Box<ServerName>, bool), (Box<ServerName>, bool, Error)> { | ||||
|         if is_appservice { | ||||
|         rooms: &Rooms, | ||||
|         globals: &Globals, | ||||
|         appservice: &Appservice, | ||||
|         pusher: &PushData, | ||||
|         account_data: &AccountData, | ||||
|     ) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> { | ||||
|         match kind { | ||||
|             OutgoingKind::Appservice(server) => { | ||||
|                 let pdu_jsons = pdu_ids | ||||
|                     .iter() | ||||
|                     .map(|pdu_id| { | ||||
|  | @ -249,7 +347,7 @@ impl Sending { | |||
|                                     ( | ||||
|                                         server.clone(), | ||||
|                                         Error::bad_database( | ||||
|                                         "Event in servernamepduids not found in db.", | ||||
|                                             "[Appservice] Event in servernamepduids not found in ", | ||||
|                                         ), | ||||
|                                     ) | ||||
|                                 })? | ||||
|  | @ -270,24 +368,80 @@ impl Sending { | |||
|                     }, | ||||
|                 ) | ||||
|                 .await | ||||
|             .map(|_response| (server.clone(), is_appservice)) | ||||
|             .map_err(|e| (server, is_appservice, e)) | ||||
|         } else { | ||||
|                 .map(|_response| OutgoingKind::Appservice(server.clone())) | ||||
|                 .map_err(|e| (OutgoingKind::Appservice(server.clone()), e)) | ||||
|             } | ||||
|             OutgoingKind::Push(id) => { | ||||
|                 let pdus = pdu_ids | ||||
|                     .iter() | ||||
|                     .map(|pdu_id| { | ||||
|                         Ok::<_, (Vec<u8>, Error)>( | ||||
|                             rooms | ||||
|                                 .get_pdu_from_id(pdu_id) | ||||
|                                 .map_err(|e| (id.clone(), e))? | ||||
|                                 .ok_or_else(|| { | ||||
|                                     ( | ||||
|                                         id.clone(), | ||||
|                                         Error::bad_database( | ||||
|                                             "[Push] Event in servernamepduids not found in db.", | ||||
|                                         ), | ||||
|                                     ) | ||||
|                                 })?, | ||||
|                         ) | ||||
|                     }) | ||||
|                     .filter_map(|r| r.ok()) | ||||
|                     .collect::<Vec<_>>(); | ||||
|                 dbg!(&pdus); | ||||
|                 for pdu in &pdus { | ||||
|                     for user in rooms.room_members(&pdu.room_id) { | ||||
|                         dbg!(&user); | ||||
|                         let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?; | ||||
|                         for pusher in pusher | ||||
|                             .get_pusher(&user) | ||||
|                             .map_err(|e| (OutgoingKind::Push(id.clone()), e))? | ||||
|                         { | ||||
|                             let rules_for_user = account_data | ||||
|                                 .get::<push_rules::PushRulesEvent>( | ||||
|                                     None, | ||||
|                                     &user, | ||||
|                                     EventType::PushRules, | ||||
|                                 ) | ||||
|                                 .map_err(|e| (OutgoingKind::Push(id.clone()), e))? | ||||
|                                 .map(|ev| ev.content.global) | ||||
|                                 .unwrap_or_else(|| crate::push_rules::default_pushrules(&user)); | ||||
|                             dbg!(&pusher); | ||||
|                             dbg!(&rules_for_user); | ||||
| 
 | ||||
|                             crate::database::pusher::send_push_notice( | ||||
|                                 &user, | ||||
|                                 &pusher, | ||||
|                                 rules_for_user, | ||||
|                                 pdu, | ||||
|                             ) | ||||
|                             .await | ||||
|                             .map_err(|e| (OutgoingKind::Push(id.clone()), e))?; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 Ok(OutgoingKind::Push(id)) | ||||
|             } | ||||
|             OutgoingKind::Normal(server) => { | ||||
|                 let pdu_jsons = pdu_ids | ||||
|                     .iter() | ||||
|                     .map(|pdu_id| { | ||||
|                     Ok::<_, (Box<ServerName>, Error)>( | ||||
|                         Ok::<_, (OutgoingKind, Error)>( | ||||
|                             // TODO: check room version and remove event_id if needed
 | ||||
|                             serde_json::from_str( | ||||
|                                 PduEvent::convert_to_outgoing_federation_event( | ||||
|                                     rooms | ||||
|                                         .get_pdu_json_from_id(pdu_id) | ||||
|                                     .map_err(|e| (server.clone(), e))? | ||||
|                                         .map_err(|e| (OutgoingKind::Normal(server.clone()), e))? | ||||
|                                         .ok_or_else(|| { | ||||
|                                             ( | ||||
|                                             server.clone(), | ||||
|                                                 OutgoingKind::Normal(server.clone()), | ||||
|                                                 Error::bad_database( | ||||
|                                                 "Event in servernamepduids not found in db.", | ||||
|                                                     "[Normal] Event in servernamepduids not found in db.", | ||||
|                                                 ), | ||||
|                                             ) | ||||
|                                         })?, | ||||
|  | @ -313,12 +467,13 @@ impl Sending { | |||
|                     }, | ||||
|                 ) | ||||
|                 .await | ||||
|             .map(|_response| (server.clone(), is_appservice)) | ||||
|             .map_err(|e| (server, is_appservice, e)) | ||||
|                 .map(|_response| OutgoingKind::Normal(server.clone())) | ||||
|                 .map_err(|e| (OutgoingKind::Normal(server.clone()), e)) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn parse_servercurrentpdus(key: IVec) -> Result<(Box<ServerName>, IVec, bool)> { | ||||
|     fn parse_servercurrentpdus(key: IVec) -> Result<(OutgoingKind, IVec)> { | ||||
|         let mut parts = key.splitn(2, |&b| b == 0xff); | ||||
|         let server = parts.next().expect("splitn always returns one element"); | ||||
|         let pdu = parts | ||||
|  | @ -330,19 +485,26 @@ impl Sending { | |||
|         })?; | ||||
| 
 | ||||
|         // Appservices start with a plus
 | ||||
|         let (server, is_appservice) = if server.starts_with('+') { | ||||
|             (&server[1..], true) | ||||
|         } else { | ||||
|             (&*server, false) | ||||
|         }; | ||||
| 
 | ||||
|         Ok::<_, Error>(( | ||||
|             Box::<ServerName>::try_from(server).map_err(|_| { | ||||
|         Ok::<_, Error>(if server.starts_with('+') { | ||||
|             ( | ||||
|                 OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| { | ||||
|                     Error::bad_database("Invalid server string in server_currenttransaction") | ||||
|             })?, | ||||
|                 })?), | ||||
|                 IVec::from(pdu), | ||||
|             is_appservice, | ||||
|         )) | ||||
|             ) | ||||
|         } else if server.starts_with('$') { | ||||
|             ( | ||||
|                 OutgoingKind::Push(server.as_bytes().to_vec()), | ||||
|                 IVec::from(pdu), | ||||
|             ) | ||||
|         } else { | ||||
|             ( | ||||
|                 OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| { | ||||
|                     Error::bad_database("Invalid server string in server_currenttransaction") | ||||
|                 })?), | ||||
|                 IVec::from(pdu), | ||||
|             ) | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn send_federation_request<T: OutgoingRequest>( | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue