2020-12-19 15:00:11 +00:00
use std ::{
2021-08-24 17:10:31 +00:00
collections ::{ BTreeMap , HashMap , HashSet } ,
2021-04-12 10:40:16 +00:00
convert ::{ TryFrom , TryInto } ,
2021-02-11 12:16:14 +00:00
fmt ::Debug ,
2020-12-19 15:00:11 +00:00
sync ::Arc ,
2021-05-20 21:46:52 +00:00
time ::{ Duration , Instant } ,
2020-12-19 15:00:11 +00:00
} ;
2020-09-15 14:13:54 +00:00
2021-03-15 08:48:19 +00:00
use crate ::{
appservice_server , database ::pusher , server_server , utils , Database , Error , PduEvent , Result ,
} ;
2020-09-23 13:23:29 +00:00
use federation ::transactions ::send_transaction_message ;
2021-03-02 13:36:48 +00:00
use ring ::digest ;
2021-06-08 16:10:00 +00:00
use rocket ::futures ::{
channel ::mpsc ,
stream ::{ FuturesUnordered , StreamExt } ,
} ;
2020-12-08 09:33:44 +00:00
use ruma ::{
2021-05-17 08:25:27 +00:00
api ::{
appservice ,
federation ::{
self ,
2021-08-24 17:10:31 +00:00
transactions ::edu ::{
DeviceListUpdateContent , Edu , ReceiptContent , ReceiptData , ReceiptMap ,
} ,
2021-05-17 08:25:27 +00:00
} ,
OutgoingRequest ,
} ,
2021-08-24 17:10:31 +00:00
device_id ,
2021-05-17 08:25:27 +00:00
events ::{ push_rules , AnySyncEphemeralRoomEvent , EventType } ,
2021-05-20 21:46:52 +00:00
push ,
receipt ::ReceiptType ,
2021-08-24 17:10:31 +00:00
uint , MilliSecondsSinceUnixEpoch , ServerName , UInt , UserId ,
2020-12-08 09:33:44 +00:00
} ;
2021-07-14 07:07:08 +00:00
use tokio ::{
select ,
sync ::{ RwLock , Semaphore } ,
} ;
2021-07-29 06:36:01 +00:00
use tracing ::{ error , warn } ;
2021-01-27 02:54:35 +00:00
2021-06-08 16:10:00 +00:00
use super ::abstraction ::Tree ;
2021-01-27 02:54:35 +00:00
#[ derive(Clone, Debug, PartialEq, Eq, Hash) ]
pub enum OutgoingKind {
Appservice ( Box < ServerName > ) ,
2021-03-22 13:04:11 +00:00
Push ( Vec < u8 > , Vec < u8 > ) , // user and pushkey
2021-01-27 02:54:35 +00:00
Normal ( Box < ServerName > ) ,
}
2021-05-12 18:04:28 +00:00
impl OutgoingKind {
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(self)) ]
2021-05-12 18:04:28 +00:00
pub fn get_prefix ( & self ) -> Vec < u8 > {
let mut prefix = match self {
OutgoingKind ::Appservice ( server ) = > {
let mut p = b " + " . to_vec ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
OutgoingKind ::Push ( user , pushkey ) = > {
let mut p = b " $ " . to_vec ( ) ;
2021-09-13 17:45:56 +00:00
p . extend_from_slice ( user ) ;
2021-05-12 18:04:28 +00:00
p . push ( 0xff ) ;
2021-09-13 17:45:56 +00:00
p . extend_from_slice ( pushkey ) ;
2021-05-12 18:04:28 +00:00
p
}
OutgoingKind ::Normal ( server ) = > {
let mut p = Vec ::new ( ) ;
p . extend_from_slice ( server . as_bytes ( ) ) ;
p
}
} ;
prefix . push ( 0xff ) ;
prefix
}
}
#[ derive(Clone, Debug, PartialEq, Eq, Hash) ]
pub enum SendingEventType {
Pdu ( Vec < u8 > ) ,
Edu ( Vec < u8 > ) ,
}
2020-09-15 14:13:54 +00:00
pub struct Sending {
/// The state for a given state hash.
2021-06-08 16:10:00 +00:00
pub ( super ) servername_educount : Arc < dyn Tree > , // EduCount: Count of last EDU sync
2021-09-14 12:23:43 +00:00
pub ( super ) servernameevent_data : Arc < dyn Tree > , // ServernameEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content
2021-08-25 12:42:46 +00:00
pub ( super ) servercurrentevent_data : Arc < dyn Tree > , // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content
2020-12-19 15:00:11 +00:00
pub ( super ) maximum_requests : Arc < Semaphore > ,
2021-07-29 18:17:47 +00:00
pub sender : mpsc ::UnboundedSender < ( Vec < u8 > , Vec < u8 > ) > ,
2020-09-15 14:13:54 +00:00
}
2021-04-24 16:01:05 +00:00
enum TransactionStatus {
Running ,
Failed ( u32 , Instant ) , // number of times failed, time of last failure
Retrying ( u32 ) , // number of times failed
}
2020-09-15 14:13:54 +00:00
impl Sending {
2021-07-14 07:07:08 +00:00
pub fn start_handler (
& self ,
db : Arc < RwLock < Database > > ,
2021-07-29 18:17:47 +00:00
mut receiver : mpsc ::UnboundedReceiver < ( Vec < u8 > , Vec < u8 > ) > ,
2021-07-14 07:07:08 +00:00
) {
2020-09-15 14:13:54 +00:00
tokio ::spawn ( async move {
let mut futures = FuturesUnordered ::new ( ) ;
2020-09-23 13:23:29 +00:00
2021-04-24 16:01:05 +00:00
let mut current_transaction_status = HashMap ::< Vec < u8 > , TransactionStatus > ::new ( ) ;
2020-10-21 14:08:54 +00:00
2021-06-08 16:10:00 +00:00
// Retry requests we could not finish yet
2021-05-12 18:04:28 +00:00
let mut initial_transactions = HashMap ::< OutgoingKind , Vec < SendingEventType > > ::new ( ) ;
2021-07-14 07:07:08 +00:00
let guard = db . read ( ) . await ;
2021-07-29 18:17:47 +00:00
for ( key , outgoing_kind , event ) in guard
. sending
. servercurrentevent_data
. iter ( )
. filter_map ( | ( key , v ) | {
Self ::parse_servercurrentevent ( & key , v )
. ok ( )
. map ( | ( k , e ) | ( key , k , e ) )
} )
2020-10-21 14:08:54 +00:00
{
2021-04-24 16:01:05 +00:00
let entry = initial_transactions
. entry ( outgoing_kind . clone ( ) )
2021-02-26 12:24:07 +00:00
. or_insert_with ( Vec ::new ) ;
if entry . len ( ) > 30 {
2021-04-24 16:01:05 +00:00
warn! (
2021-05-12 18:04:28 +00:00
" Dropping some current events: {:?} {:?} {:?} " ,
key , outgoing_kind , event
2021-04-24 16:01:05 +00:00
) ;
2021-07-29 18:17:47 +00:00
guard . sending . servercurrentevent_data . remove ( & key ) . unwrap ( ) ;
2021-02-26 12:24:07 +00:00
continue ;
}
2021-05-12 18:04:28 +00:00
entry . push ( event ) ;
2020-10-21 14:08:54 +00:00
}
2021-07-14 07:07:08 +00:00
drop ( guard ) ;
2021-05-12 18:04:28 +00:00
for ( outgoing_kind , events ) in initial_transactions {
current_transaction_status
. insert ( outgoing_kind . get_prefix ( ) , TransactionStatus ::Running ) ;
2021-07-14 07:07:08 +00:00
futures . push ( Self ::handle_events (
outgoing_kind . clone ( ) ,
events ,
Arc ::clone ( & db ) ,
) ) ;
2020-10-21 14:08:54 +00:00
}
2020-09-15 14:13:54 +00:00
loop {
select! {
2020-12-08 09:33:44 +00:00
Some ( response ) = futures . next ( ) = > {
match response {
2021-01-27 02:54:35 +00:00
Ok ( outgoing_kind ) = > {
2021-07-14 07:07:08 +00:00
let guard = db . read ( ) . await ;
2021-05-12 18:04:28 +00:00
let prefix = outgoing_kind . get_prefix ( ) ;
2021-07-29 18:17:47 +00:00
for ( key , _ ) in guard . sending . servercurrentevent_data
2021-06-08 16:10:00 +00:00
. scan_prefix ( prefix . clone ( ) )
2020-10-21 14:08:54 +00:00
{
2021-07-29 18:17:47 +00:00
guard . sending . servercurrentevent_data . remove ( & key ) . unwrap ( ) ;
2020-10-21 14:08:54 +00:00
}
// Find events that have been added since starting the last request
2021-07-29 18:17:47 +00:00
let new_events = guard . sending . servernameevent_data
2021-06-08 16:10:00 +00:00
. scan_prefix ( prefix . clone ( ) )
2021-07-29 18:17:47 +00:00
. filter_map ( | ( k , v ) | {
Self ::parse_servercurrentevent ( & k , v ) . ok ( ) . map ( | ev | ( ev , k ) )
2020-11-03 20:20:35 +00:00
} )
2021-02-26 12:24:07 +00:00
. take ( 30 )
2020-11-03 20:20:35 +00:00
. collect ::< Vec < _ > > ( ) ;
2020-10-21 14:08:54 +00:00
2021-05-12 18:04:28 +00:00
// TODO: find edus
if ! new_events . is_empty ( ) {
// Insert pdus we found
2021-07-29 18:17:47 +00:00
for ( e , key ) in & new_events {
let value = if let SendingEventType ::Edu ( value ) = & e . 1 { & * * value } else { & [ ] } ;
2021-09-13 17:45:56 +00:00
guard . sending . servercurrentevent_data . insert ( key , value ) . unwrap ( ) ;
guard . sending . servernameevent_data . remove ( key ) . unwrap ( ) ;
2020-10-21 14:08:54 +00:00
}
2021-07-14 07:07:08 +00:00
drop ( guard ) ;
2021-01-27 02:54:35 +00:00
futures . push (
2021-05-12 18:04:28 +00:00
Self ::handle_events (
2021-01-27 02:54:35 +00:00
outgoing_kind . clone ( ) ,
2021-07-20 21:36:03 +00:00
new_events . into_iter ( ) . map ( | ( event , _ ) | event . 1 ) . collect ( ) ,
2021-07-14 07:07:08 +00:00
Arc ::clone ( & db ) ,
2021-01-27 02:54:35 +00:00
)
) ;
2020-10-21 14:08:54 +00:00
} else {
2021-04-24 16:01:05 +00:00
current_transaction_status . remove ( & prefix ) ;
2020-10-21 14:08:54 +00:00
}
2020-09-23 13:23:29 +00:00
}
2021-03-25 22:55:40 +00:00
Err ( ( outgoing_kind , _ ) ) = > {
2021-05-12 18:04:28 +00:00
current_transaction_status . entry ( outgoing_kind . get_prefix ( ) ) . and_modify ( | e | * e = match e {
2021-04-24 16:01:05 +00:00
TransactionStatus ::Running = > TransactionStatus ::Failed ( 1 , Instant ::now ( ) ) ,
TransactionStatus ::Retrying ( n ) = > TransactionStatus ::Failed ( * n + 1 , Instant ::now ( ) ) ,
TransactionStatus ::Failed ( _ , _ ) = > {
error! ( " Request that was not even running failed?! " ) ;
return
2020-12-19 15:00:11 +00:00
} ,
} ) ;
2020-09-23 13:23:29 +00:00
}
} ;
} ,
2021-07-29 18:17:47 +00:00
Some ( ( key , value ) ) = receiver . next ( ) = > {
if let Ok ( ( outgoing_kind , event ) ) = Self ::parse_servercurrentevent ( & key , value ) {
2021-07-14 07:07:08 +00:00
let guard = db . read ( ) . await ;
2021-06-08 16:10:00 +00:00
if let Ok ( Some ( events ) ) = Self ::select_events (
& outgoing_kind ,
vec! [ ( event , key ) ] ,
& mut current_transaction_status ,
2021-07-14 07:07:08 +00:00
& guard
2021-06-08 16:10:00 +00:00
) {
2021-07-14 07:07:08 +00:00
futures . push ( Self ::handle_events ( outgoing_kind , events , Arc ::clone ( & db ) ) ) ;
2020-09-15 14:13:54 +00:00
}
2020-09-23 13:23:29 +00:00
}
}
2020-09-15 14:13:54 +00:00
}
}
} ) ;
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(outgoing_kind, new_events, current_transaction_status, db)) ]
2021-05-12 18:04:28 +00:00
fn select_events (
outgoing_kind : & OutgoingKind ,
2021-06-08 16:10:00 +00:00
new_events : Vec < ( SendingEventType , Vec < u8 > ) > , // Events we want to send: event and full key
2021-05-12 18:04:28 +00:00
current_transaction_status : & mut HashMap < Vec < u8 > , TransactionStatus > ,
2021-05-17 08:25:27 +00:00
db : & Database ,
2021-06-08 16:10:00 +00:00
) -> Result < Option < Vec < SendingEventType > > > {
2021-05-12 18:04:28 +00:00
let mut retry = false ;
let mut allow = true ;
let prefix = outgoing_kind . get_prefix ( ) ;
let entry = current_transaction_status . entry ( prefix . clone ( ) ) ;
entry
. and_modify ( | e | match e {
TransactionStatus ::Running | TransactionStatus ::Retrying ( _ ) = > {
allow = false ; // already running
}
TransactionStatus ::Failed ( tries , time ) = > {
// Fail if a request has failed recently (exponential backoff)
let mut min_elapsed_duration = Duration ::from_secs ( 30 ) * ( * tries ) * ( * tries ) ;
if min_elapsed_duration > Duration ::from_secs ( 60 * 60 * 24 ) {
min_elapsed_duration = Duration ::from_secs ( 60 * 60 * 24 ) ;
}
if time . elapsed ( ) < min_elapsed_duration {
allow = false ;
} else {
retry = true ;
* e = TransactionStatus ::Retrying ( * tries ) ;
}
}
} )
. or_insert ( TransactionStatus ::Running ) ;
if ! allow {
2021-06-08 16:10:00 +00:00
return Ok ( None ) ;
2021-05-12 18:04:28 +00:00
}
let mut events = Vec ::new ( ) ;
if retry {
// We retry the previous transaction
2021-07-29 18:17:47 +00:00
for ( key , value ) in db . sending . servercurrentevent_data . scan_prefix ( prefix ) {
if let Ok ( ( _ , e ) ) = Self ::parse_servercurrentevent ( & key , value ) {
2021-05-12 18:04:28 +00:00
events . push ( e ) ;
}
}
} else {
for ( e , full_key ) in new_events {
2021-07-29 18:17:47 +00:00
let value = if let SendingEventType ::Edu ( value ) = & e {
& * * value
} else {
& [ ] [ .. ]
} ;
db . sending
. servercurrentevent_data
. insert ( & full_key , value ) ? ;
2021-05-12 18:04:28 +00:00
// If it was a PDU we have to unqueue it
// TODO: don't try to unqueue EDUs
2021-07-29 18:17:47 +00:00
db . sending . servernameevent_data . remove ( & full_key ) ? ;
2021-05-12 18:04:28 +00:00
events . push ( e ) ;
}
2021-05-17 08:25:27 +00:00
2021-05-20 21:46:52 +00:00
if let OutgoingKind ::Normal ( server_name ) = outgoing_kind {
if let Ok ( ( select_edus , last_count ) ) = Self ::select_edus ( db , server_name ) {
2021-07-20 19:17:15 +00:00
events . extend ( select_edus . into_iter ( ) . map ( SendingEventType ::Edu ) ) ;
2021-05-20 21:46:52 +00:00
db . sending
. servername_educount
2021-06-08 16:10:00 +00:00
. insert ( server_name . as_bytes ( ) , & last_count . to_be_bytes ( ) ) ? ;
2021-05-17 08:25:27 +00:00
}
}
2021-05-12 18:04:28 +00:00
}
2021-06-08 16:10:00 +00:00
Ok ( Some ( events ) )
2021-05-12 18:04:28 +00:00
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(db, server)) ]
2021-07-20 19:17:15 +00:00
pub fn select_edus ( db : & Database , server : & ServerName ) -> Result < ( Vec < Vec < u8 > > , u64 ) > {
2021-05-17 08:25:27 +00:00
// u64: count of last edu
let since = db
. sending
. servername_educount
. get ( server . as_bytes ( ) ) ?
. map_or ( Ok ( 0 ) , | bytes | {
utils ::u64_from_bytes ( & bytes )
. map_err ( | _ | Error ::bad_database ( " Invalid u64 in servername_educount. " ) )
} ) ? ;
let mut events = Vec ::new ( ) ;
let mut max_edu_count = since ;
2021-08-24 17:10:31 +00:00
let mut device_list_changes = HashSet ::new ( ) ;
2021-05-17 08:25:27 +00:00
' outer : for room_id in db . rooms . server_rooms ( server ) {
let room_id = room_id ? ;
2021-08-24 17:10:31 +00:00
// Look for device list updates in this room
device_list_changes . extend (
db . users
. keys_changed ( & room_id . to_string ( ) , since , None )
. filter_map ( | r | r . ok ( ) )
. filter ( | user_id | user_id . server_name ( ) = = db . globals . server_name ( ) ) ,
) ;
// Look for read receipts in this room
2021-06-08 16:10:00 +00:00
for r in db . rooms . edus . readreceipts_since ( & room_id , since ) {
2021-05-17 08:25:27 +00:00
let ( user_id , count , read_receipt ) = r ? ;
if count > max_edu_count {
max_edu_count = count ;
}
if user_id . server_name ( ) ! = db . globals . server_name ( ) {
continue ;
}
let event =
2021-09-13 17:45:56 +00:00
serde_json ::from_str ::< AnySyncEphemeralRoomEvent > ( read_receipt . json ( ) . get ( ) )
2021-05-17 08:25:27 +00:00
. map_err ( | _ | Error ::bad_database ( " Invalid edu event in read_receipts. " ) ) ? ;
let federation_event = match event {
AnySyncEphemeralRoomEvent ::Receipt ( r ) = > {
let mut read = BTreeMap ::new ( ) ;
2021-05-20 21:46:52 +00:00
let ( event_id , mut receipt ) = r
2021-05-17 08:25:27 +00:00
. content
. 0
. into_iter ( )
. next ( )
. expect ( " we only use one event per read receipt " ) ;
let receipt = receipt
2021-05-20 21:46:52 +00:00
. remove ( & ReceiptType ::Read )
2021-05-17 08:25:27 +00:00
. expect ( " our read receipts always set this " )
. remove ( & user_id )
. expect ( " our read receipts always have the user here " ) ;
read . insert (
user_id ,
ReceiptData {
data : receipt . clone ( ) ,
event_ids : vec ! [ event_id . clone ( ) ] ,
} ,
) ;
let receipt_map = ReceiptMap { read } ;
let mut receipts = BTreeMap ::new ( ) ;
receipts . insert ( room_id . clone ( ) , receipt_map ) ;
Edu ::Receipt ( ReceiptContent { receipts } )
}
_ = > {
Error ::bad_database ( " Invalid event type in read_receipts " ) ;
continue ;
}
} ;
2021-07-20 19:17:15 +00:00
events . push ( serde_json ::to_vec ( & federation_event ) . expect ( " json can be serialized " ) ) ;
2021-05-17 08:25:27 +00:00
if events . len ( ) > = 20 {
break 'outer ;
}
}
}
2021-08-24 17:10:31 +00:00
for user_id in device_list_changes {
// Empty prev id forces synapse to resync: https://github.com/matrix-org/synapse/blob/98aec1cc9da2bd6b8e34ffb282c85abf9b8b42ca/synapse/handlers/device.py#L767
// Because synapse resyncs, we can just insert dummy data
let edu = Edu ::DeviceListUpdate ( DeviceListUpdateContent {
user_id ,
device_id : device_id ! ( " dummy " ) ,
device_display_name : " Dummy " . to_owned ( ) ,
stream_id : uint ! ( 1 ) ,
prev_id : Vec ::new ( ) ,
deleted : None ,
keys : None ,
} ) ;
events . push ( serde_json ::to_vec ( & edu ) . expect ( " json can be serialized " ) ) ;
}
2021-05-17 08:25:27 +00:00
Ok ( ( events , max_edu_count ) )
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(self, pdu_id, senderkey)) ]
2021-07-14 07:07:08 +00:00
pub fn send_push_pdu ( & self , pdu_id : & [ u8 ] , senderkey : Vec < u8 > ) -> Result < ( ) > {
2021-01-27 02:54:35 +00:00
let mut key = b " $ " . to_vec ( ) ;
2021-03-22 13:04:11 +00:00
key . extend_from_slice ( & senderkey ) ;
2021-01-27 02:54:35 +00:00
key . push ( 0xff ) ;
key . extend_from_slice ( pdu_id ) ;
2021-07-29 18:17:47 +00:00
self . servernameevent_data . insert ( & key , & [ ] ) ? ;
self . sender . unbounded_send ( ( key , vec! [ ] ) ) . unwrap ( ) ;
2021-01-27 02:54:35 +00:00
Ok ( ( ) )
}
2021-09-14 12:23:43 +00:00
#[ tracing::instrument(skip(self, servers, pdu_id)) ]
pub fn send_pdu < I : Iterator < Item = Box < ServerName > > > (
& self ,
servers : I ,
pdu_id : & [ u8 ] ,
) -> Result < ( ) > {
let mut batch = servers . map ( | server | {
let mut key = server . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( pdu_id ) ;
self . sender . unbounded_send ( ( key . clone ( ) , vec! [ ] ) ) . unwrap ( ) ;
( key , Vec ::new ( ) )
} ) ;
self . servernameevent_data . insert_batch ( & mut batch ) ? ;
2021-07-20 19:17:15 +00:00
Ok ( ( ) )
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(self, server, serialized)) ]
2021-08-25 12:42:46 +00:00
pub fn send_reliable_edu (
& self ,
server : & ServerName ,
serialized : Vec < u8 > ,
id : u64 ,
) -> Result < ( ) > {
2021-07-20 19:17:15 +00:00
let mut key = server . as_bytes ( ) . to_vec ( ) ;
key . push ( 0xff ) ;
2021-08-25 12:42:46 +00:00
key . extend_from_slice ( & id . to_be_bytes ( ) ) ;
2021-07-29 18:17:47 +00:00
self . servernameevent_data . insert ( & key , & serialized ) ? ;
self . sender . unbounded_send ( ( key , serialized ) ) . unwrap ( ) ;
2020-09-15 14:13:54 +00:00
Ok ( ( ) )
}
2020-09-23 13:23:29 +00:00
2021-02-28 11:41:03 +00:00
#[ tracing::instrument(skip(self)) ]
2020-12-08 09:33:44 +00:00
pub fn send_pdu_appservice ( & self , appservice_id : & str , pdu_id : & [ u8 ] ) -> Result < ( ) > {
2021-01-05 14:21:41 +00:00
let mut key = b " + " . to_vec ( ) ;
2020-12-08 09:33:44 +00:00
key . extend_from_slice ( appservice_id . as_bytes ( ) ) ;
key . push ( 0xff ) ;
key . extend_from_slice ( pdu_id ) ;
2021-07-29 18:17:47 +00:00
self . servernameevent_data . insert ( & key , & [ ] ) ? ;
self . sender . unbounded_send ( ( key , vec! [ ] ) ) . unwrap ( ) ;
2020-12-08 09:33:44 +00:00
Ok ( ( ) )
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(keys)) ]
2021-05-12 18:04:28 +00:00
fn calculate_hash ( keys : & [ & [ u8 ] ] ) -> Vec < u8 > {
2021-03-02 13:36:48 +00:00
// We only hash the pdu's event ids, not the whole pdu
let bytes = keys . join ( & 0xff ) ;
let hash = digest ::digest ( & digest ::SHA256 , & bytes ) ;
hash . as_ref ( ) . to_owned ( )
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(db, events, kind)) ]
2021-05-12 18:04:28 +00:00
async fn handle_events (
2021-01-27 02:54:35 +00:00
kind : OutgoingKind ,
2021-05-12 18:04:28 +00:00
events : Vec < SendingEventType > ,
2021-07-14 07:07:08 +00:00
db : Arc < RwLock < Database > > ,
2021-01-27 02:54:35 +00:00
) -> std ::result ::Result < OutgoingKind , ( OutgoingKind , Error ) > {
2021-07-14 07:07:08 +00:00
let db = db . read ( ) . await ;
2021-03-16 17:00:26 +00:00
match & kind {
2021-01-27 02:54:35 +00:00
OutgoingKind ::Appservice ( server ) = > {
2021-05-12 18:04:28 +00:00
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
pdu_jsons . push ( db . rooms
2021-09-13 17:45:56 +00:00
. get_pdu_from_id ( pdu_id )
2021-05-12 18:04:28 +00:00
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
2021-01-27 02:54:35 +00:00
. ok_or_else ( | | {
(
2021-05-12 18:04:28 +00:00
kind . clone ( ) ,
2021-01-27 02:54:35 +00:00
Error ::bad_database (
2021-07-29 18:17:47 +00:00
" [Appservice] Event in servernameevent_data not found in db. " ,
2021-01-27 02:54:35 +00:00
) ,
)
} ) ?
2021-05-20 21:46:52 +00:00
. to_room_event ( ) )
2021-05-12 18:04:28 +00:00
}
SendingEventType ::Edu ( _ ) = > {
// Appservices don't need EDUs (?)
}
}
}
2021-03-15 08:48:19 +00:00
let permit = db . sending . maximum_requests . acquire ( ) . await ;
2021-03-22 13:04:11 +00:00
2021-03-15 08:48:19 +00:00
let response = appservice_server ::send_request (
2021-01-29 15:14:09 +00:00
& db . globals ,
db . appservice
2021-01-27 02:54:35 +00:00
. get_registration ( server . as_str ( ) )
. unwrap ( )
. unwrap ( ) , // TODO: handle error
appservice ::event ::push_events ::v1 ::Request {
events : & pdu_jsons ,
2021-03-15 08:48:19 +00:00
txn_id : & base64 ::encode_config (
2021-05-12 18:04:28 +00:00
Self ::calculate_hash (
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
2021-03-15 08:48:19 +00:00
base64 ::URL_SAFE_NO_PAD ,
) ,
2021-01-27 02:54:35 +00:00
} ,
)
. await
2021-03-15 08:48:19 +00:00
. map ( | _response | kind . clone ( ) )
. map_err ( | e | ( kind , e ) ) ;
2021-02-26 12:24:07 +00:00
2021-03-15 08:48:19 +00:00
drop ( permit ) ;
2021-02-26 12:24:07 +00:00
2021-03-15 08:48:19 +00:00
response
2021-01-27 02:54:35 +00:00
}
2021-03-22 13:04:11 +00:00
OutgoingKind ::Push ( user , pushkey ) = > {
2021-05-12 18:04:28 +00:00
let mut pdus = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
pdus . push (
db . rooms
2021-09-13 17:45:56 +00:00
. get_pdu_from_id ( pdu_id )
2021-05-12 18:04:28 +00:00
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. ok_or_else ( | | {
(
kind . clone ( ) ,
Error ::bad_database (
2021-07-29 18:17:47 +00:00
" [Push] Event in servernamevent_datas not found in db. " ,
2021-05-12 18:04:28 +00:00
) ,
)
} ) ? ,
) ;
}
SendingEventType ::Edu ( _ ) = > {
// Push gateways don't need EDUs (?)
}
}
}
2021-01-29 15:14:09 +00:00
2021-03-16 17:00:26 +00:00
for pdu in pdus {
2021-01-29 15:14:09 +00:00
// Redacted events are not notification targets (we don't send push for them)
if pdu . unsigned . get ( " redacted_because " ) . is_some ( ) {
continue ;
}
2021-03-24 10:52:10 +00:00
let userid =
UserId ::try_from ( utils ::string_from_bytes ( user ) . map_err ( | _ | {
(
2021-05-12 18:04:28 +00:00
kind . clone ( ) ,
2021-03-24 10:52:10 +00:00
Error ::bad_database ( " Invalid push user string in db. " ) ,
)
} ) ? )
. map_err ( | _ | {
(
2021-05-12 18:04:28 +00:00
kind . clone ( ) ,
2021-03-24 10:52:10 +00:00
Error ::bad_database ( " Invalid push user id in db. " ) ,
)
} ) ? ;
2021-03-22 13:04:11 +00:00
let mut senderkey = user . clone ( ) ;
senderkey . push ( 0xff ) ;
senderkey . extend_from_slice ( pushkey ) ;
let pusher = match db
. pusher
. get_pusher ( & senderkey )
. map_err ( | e | ( OutgoingKind ::Push ( user . clone ( ) , pushkey . clone ( ) ) , e ) ) ?
{
Some ( pusher ) = > pusher ,
None = > continue ,
} ;
let rules_for_user = db
. account_data
. get ::< push_rules ::PushRulesEvent > ( None , & userid , EventType ::PushRules )
2021-04-12 10:40:16 +00:00
. unwrap_or_default ( )
2021-03-22 13:04:11 +00:00
. map ( | ev | ev . content . global )
2021-04-05 19:25:10 +00:00
. unwrap_or_else ( | | push ::Ruleset ::server_default ( & userid ) ) ;
2021-03-22 13:04:11 +00:00
2021-04-12 10:40:16 +00:00
let unread : UInt = db
2021-03-22 13:04:11 +00:00
. rooms
2021-04-12 10:40:16 +00:00
. notification_count ( & userid , & pdu . room_id )
. map_err ( | e | ( kind . clone ( ) , e ) ) ?
. try_into ( )
. expect ( " notifiation count can't go that high " ) ;
2021-03-22 13:04:11 +00:00
let permit = db . sending . maximum_requests . acquire ( ) . await ;
let _response = pusher ::send_push_notice (
& userid ,
unread ,
& pusher ,
rules_for_user ,
& pdu ,
2021-07-14 07:07:08 +00:00
& db ,
2021-03-22 13:04:11 +00:00
)
. await
. map ( | _response | kind . clone ( ) )
. map_err ( | e | ( kind . clone ( ) , e ) ) ;
drop ( permit ) ;
2021-01-27 02:54:35 +00:00
}
2021-03-22 13:04:11 +00:00
Ok ( OutgoingKind ::Push ( user . clone ( ) , pushkey . clone ( ) ) )
2021-01-27 02:54:35 +00:00
}
OutgoingKind ::Normal ( server ) = > {
2021-05-12 18:04:28 +00:00
let mut edu_jsons = Vec ::new ( ) ;
let mut pdu_jsons = Vec ::new ( ) ;
for event in & events {
match event {
SendingEventType ::Pdu ( pdu_id ) = > {
2021-01-27 02:54:35 +00:00
// TODO: check room version and remove event_id if needed
2021-07-30 16:05:26 +00:00
let raw = PduEvent ::convert_to_outgoing_federation_event (
db . rooms
2021-09-13 17:45:56 +00:00
. get_pdu_json_from_id ( pdu_id )
2021-07-30 16:05:26 +00:00
. map_err ( | e | ( OutgoingKind ::Normal ( server . clone ( ) ) , e ) ) ?
. ok_or_else ( | | {
(
OutgoingKind ::Normal ( server . clone ( ) ) ,
Error ::bad_database (
" [Normal] Event in servernamevent_datas not found in db. " ,
) ,
)
} ) ? ,
2021-09-22 10:27:21 +00:00
) . into_json ( ) ;
2021-07-30 16:05:26 +00:00
pdu_jsons . push ( raw ) ;
2021-05-12 18:04:28 +00:00
}
SendingEventType ::Edu ( edu ) = > {
2021-07-30 16:05:26 +00:00
if let Ok ( raw ) = serde_json ::from_slice ( edu ) {
edu_jsons . push ( raw ) ;
}
2021-05-12 18:04:28 +00:00
}
}
}
2021-01-27 02:54:35 +00:00
2021-03-15 08:48:19 +00:00
let permit = db . sending . maximum_requests . acquire ( ) . await ;
let response = server_server ::send_request (
2021-01-29 15:14:09 +00:00
& db . globals ,
2021-01-27 02:54:35 +00:00
& * server ,
send_transaction_message ::v1 ::Request {
2021-01-29 15:14:09 +00:00
origin : db . globals . server_name ( ) ,
2021-01-27 02:54:35 +00:00
pdus : & pdu_jsons ,
2021-05-12 18:04:28 +00:00
edus : & edu_jsons ,
2021-05-20 21:46:52 +00:00
origin_server_ts : MilliSecondsSinceUnixEpoch ::now ( ) ,
2021-03-15 08:48:19 +00:00
transaction_id : & base64 ::encode_config (
2021-05-12 18:04:28 +00:00
Self ::calculate_hash (
& events
. iter ( )
. map ( | e | match e {
SendingEventType ::Edu ( b ) | SendingEventType ::Pdu ( b ) = > & * * b ,
} )
. collect ::< Vec < _ > > ( ) ,
) ,
2021-03-15 08:48:19 +00:00
base64 ::URL_SAFE_NO_PAD ,
) ,
2021-01-27 02:54:35 +00:00
} ,
)
. await
2021-03-15 08:48:19 +00:00
. map ( | response | {
2021-03-26 10:10:45 +00:00
for pdu in response . pdus {
if pdu . 1. is_err ( ) {
warn! ( " Failed to send to {}: {:?} " , server , pdu ) ;
}
}
2021-03-15 08:48:19 +00:00
kind . clone ( )
2020-12-08 09:33:44 +00:00
} )
2021-03-15 08:48:19 +00:00
. map_err ( | e | ( kind , e ) ) ;
2021-02-26 12:24:07 +00:00
2021-03-15 08:48:19 +00:00
drop ( permit ) ;
2021-02-26 12:24:07 +00:00
2021-03-15 08:48:19 +00:00
response
2021-01-27 02:54:35 +00:00
}
2020-12-08 09:33:44 +00:00
}
2020-09-23 13:23:29 +00:00
}
2020-12-19 15:00:11 +00:00
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(key)) ]
2021-07-29 18:17:47 +00:00
fn parse_servercurrentevent (
key : & [ u8 ] ,
value : Vec < u8 > ,
) -> Result < ( OutgoingKind , SendingEventType ) > {
2021-03-22 13:04:11 +00:00
// Appservices start with a plus
Ok ::< _ , Error > ( if key . starts_with ( b " + " ) {
let mut parts = key [ 1 .. ] . splitn ( 2 , | & b | b = = 0xff ) ;
2020-12-19 15:00:11 +00:00
2021-03-22 13:04:11 +00:00
let server = parts . next ( ) . expect ( " splitn always returns one element " ) ;
2021-05-12 18:04:28 +00:00
let event = parts
2021-03-22 13:04:11 +00:00
. next ( )
. ok_or_else ( | | Error ::bad_database ( " Invalid bytes in servercurrentpdus. " ) ) ? ;
2021-09-13 17:45:56 +00:00
let server = utils ::string_from_bytes ( server ) . map_err ( | _ | {
2021-03-22 13:04:11 +00:00
Error ::bad_database ( " Invalid server bytes in server_currenttransaction " )
} ) ? ;
2020-12-19 15:00:11 +00:00
2021-01-27 02:54:35 +00:00
(
OutgoingKind ::Appservice ( Box ::< ServerName > ::try_from ( server ) . map_err ( | _ | {
Error ::bad_database ( " Invalid server string in server_currenttransaction " )
} ) ? ) ,
2021-08-25 12:42:46 +00:00
if value . is_empty ( ) {
2021-05-12 18:04:28 +00:00
SendingEventType ::Pdu ( event . to_vec ( ) )
2021-08-25 12:42:46 +00:00
} else {
SendingEventType ::Edu ( value )
2021-05-12 18:04:28 +00:00
} ,
2021-01-27 02:54:35 +00:00
)
2021-03-22 13:04:11 +00:00
} else if key . starts_with ( b " $ " ) {
let mut parts = key [ 1 .. ] . splitn ( 3 , | & b | b = = 0xff ) ;
let user = parts . next ( ) . expect ( " splitn always returns one element " ) ;
let pushkey = parts
. next ( )
. ok_or_else ( | | Error ::bad_database ( " Invalid bytes in servercurrentpdus. " ) ) ? ;
2021-05-12 18:04:28 +00:00
let event = parts
2021-03-22 13:04:11 +00:00
. next ( )
. ok_or_else ( | | Error ::bad_database ( " Invalid bytes in servercurrentpdus. " ) ) ? ;
2021-01-27 02:54:35 +00:00
(
2021-03-22 13:04:11 +00:00
OutgoingKind ::Push ( user . to_vec ( ) , pushkey . to_vec ( ) ) ,
2021-08-25 12:42:46 +00:00
if value . is_empty ( ) {
2021-05-12 18:04:28 +00:00
SendingEventType ::Pdu ( event . to_vec ( ) )
2021-08-25 12:42:46 +00:00
} else {
SendingEventType ::Edu ( value )
2021-05-12 18:04:28 +00:00
} ,
2021-01-27 02:54:35 +00:00
)
2020-12-19 15:00:11 +00:00
} else {
2021-03-22 13:04:11 +00:00
let mut parts = key . splitn ( 2 , | & b | b = = 0xff ) ;
let server = parts . next ( ) . expect ( " splitn always returns one element " ) ;
2021-05-12 18:04:28 +00:00
let event = parts
2021-03-22 13:04:11 +00:00
. next ( )
. ok_or_else ( | | Error ::bad_database ( " Invalid bytes in servercurrentpdus. " ) ) ? ;
2021-09-13 17:45:56 +00:00
let server = utils ::string_from_bytes ( server ) . map_err ( | _ | {
2021-03-22 13:04:11 +00:00
Error ::bad_database ( " Invalid server bytes in server_currenttransaction " )
} ) ? ;
2021-01-27 02:54:35 +00:00
(
OutgoingKind ::Normal ( Box ::< ServerName > ::try_from ( server ) . map_err ( | _ | {
Error ::bad_database ( " Invalid server string in server_currenttransaction " )
} ) ? ) ,
2021-08-25 12:42:46 +00:00
if value . is_empty ( ) {
2021-05-12 18:04:28 +00:00
SendingEventType ::Pdu ( event . to_vec ( ) )
2021-08-25 12:42:46 +00:00
} else {
SendingEventType ::Edu ( value )
2021-05-12 18:04:28 +00:00
} ,
2021-01-27 02:54:35 +00:00
)
} )
2020-12-19 15:00:11 +00:00
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(self, globals, destination, request)) ]
2020-12-19 15:00:11 +00:00
pub async fn send_federation_request < T : OutgoingRequest > (
& self ,
globals : & crate ::database ::globals ::Globals ,
2021-01-14 19:39:56 +00:00
destination : & ServerName ,
2020-12-19 15:00:11 +00:00
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire ( ) . await ;
let response = server_server ::send_request ( globals , destination , request ) . await ;
drop ( permit ) ;
response
}
2021-07-29 06:36:01 +00:00
#[ tracing::instrument(skip(self, globals, registration, request)) ]
2020-12-19 15:00:11 +00:00
pub async fn send_appservice_request < T : OutgoingRequest > (
& self ,
globals : & crate ::database ::globals ::Globals ,
registration : serde_yaml ::Value ,
request : T ,
) -> Result < T ::IncomingResponse >
where
T : Debug ,
{
let permit = self . maximum_requests . acquire ( ) . await ;
let response = appservice_server ::send_request ( globals , registration , request ) . await ;
drop ( permit ) ;
response
}
2020-09-15 14:13:54 +00:00
}