Merge branch 'improvements' into 'master'
improvement: bug fixes and refactors See merge request famedly/conduit!78
This commit is contained in:
		
						commit
						68c5c57486
					
				
					 9 changed files with 356 additions and 277 deletions
				
			
		|  | @ -106,3 +106,7 @@ conf-files = [ | |||
| ] | ||||
| maintainer-scripts = "debian/" | ||||
| systemd-units = { unit-name = "matrix-conduit" } | ||||
| 
 | ||||
| # For flamegraphs: | ||||
| #[profile.release] | ||||
| #debug = true | ||||
|  |  | |||
|  | @ -568,7 +568,13 @@ async fn join_room_by_id_helper( | |||
|         { | ||||
|             let (event_id, value) = match result { | ||||
|                 Ok(t) => t, | ||||
|                 Err(_) => continue, | ||||
|                 Err(e) => { | ||||
|                     warn!( | ||||
|                         "PDU could not be verified: {:?} {:?} {:?}", | ||||
|                         e, event_id, pdu | ||||
|                     ); | ||||
|                     continue; | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             let pdu = PduEvent::from_id_val(&event_id, value.clone()).map_err(|e| { | ||||
|  |  | |||
|  | @ -111,11 +111,7 @@ pub async fn create_room_route( | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let power_levels_content = if let Some(power_levels) = &body.power_level_content_override { | ||||
|         serde_json::from_str(power_levels.json().get()).map_err(|_| { | ||||
|             Error::BadRequest(ErrorKind::BadJson, "Invalid power_level_content_override.") | ||||
|         })? | ||||
|     } else { | ||||
|     let mut power_levels_content = | ||||
|         serde_json::to_value(ruma::events::room::power_levels::PowerLevelsEventContent { | ||||
|             ban: 50.into(), | ||||
|             events: BTreeMap::new(), | ||||
|  | @ -130,8 +126,21 @@ pub async fn create_room_route( | |||
|                 room: 50.into(), | ||||
|             }, | ||||
|         }) | ||||
|         .expect("event is valid, we just created it") | ||||
|     }; | ||||
|         .expect("event is valid, we just created it"); | ||||
| 
 | ||||
|     if let Some(power_level_content_override) = &body.power_level_content_override { | ||||
|         let json = serde_json::from_str::<serde_json::Map<String, serde_json::Value>>( | ||||
|             power_level_content_override.json().get(), | ||||
|         ) | ||||
|         .map_err(|_| { | ||||
|             Error::BadRequest(ErrorKind::BadJson, "Invalid power_level_content_override.") | ||||
|         })?; | ||||
| 
 | ||||
|         for (key, value) in json { | ||||
|             power_levels_content[key] = value; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     db.rooms.build_and_append_pdu( | ||||
|         PduBuilder { | ||||
|             event_type: EventType::RoomPowerLevels, | ||||
|  |  | |||
|  | @ -198,7 +198,7 @@ impl Database { | |||
|             }, | ||||
|             sending: sending::Sending { | ||||
|                 servernamepduids: db.open_tree("servernamepduids")?, | ||||
|                 servercurrentpdus: db.open_tree("servercurrentpdus")?, | ||||
|                 servercurrentevents: db.open_tree("servercurrentevents")?, | ||||
|                 maximum_requests: Arc::new(Semaphore::new(config.max_concurrent_requests as usize)), | ||||
|             }, | ||||
|             admin: admin::Admin { | ||||
|  | @ -217,6 +217,9 @@ impl Database { | |||
|             _db: db, | ||||
|         }; | ||||
| 
 | ||||
|         // This data is probably outdated
 | ||||
|         db.rooms.edus.presenceid_presence.clear()?; | ||||
| 
 | ||||
|         db.admin.start_handler(db.clone(), admin_receiver); | ||||
| 
 | ||||
|         Ok(db) | ||||
|  |  | |||
|  | @ -6,6 +6,7 @@ use ruma::{ | |||
|     }, | ||||
|     presence::PresenceState, | ||||
|     serde::Raw, | ||||
|     signatures::CanonicalJsonObject, | ||||
|     RoomId, UInt, UserId, | ||||
| }; | ||||
| use std::{ | ||||
|  | @ -88,9 +89,13 @@ impl RoomEdus { | |||
|             .filter_map(|r| r.ok()) | ||||
|             .take_while(move |(k, _)| k.starts_with(&prefix)) | ||||
|             .map(|(_, v)| { | ||||
|                 Ok(serde_json::from_slice(&v).map_err(|_| { | ||||
|                     Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid.") | ||||
|                 })?) | ||||
|                 let mut json = serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| { | ||||
|                     Error::bad_database("Read receipt in roomlatestid_roomlatest is invalid json.") | ||||
|                 })?; | ||||
|                 json.remove("room_id"); | ||||
|                 Ok(Raw::from_json( | ||||
|                     serde_json::value::to_raw_value(&json).expect("json is valid raw value"), | ||||
|                 )) | ||||
|             })) | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -28,11 +28,44 @@ pub enum OutgoingKind { | |||
|     Normal(Box<ServerName>), | ||||
| } | ||||
| 
 | ||||
| impl OutgoingKind { | ||||
|     pub fn get_prefix(&self) -> Vec<u8> { | ||||
|         let mut prefix = match self { | ||||
|             OutgoingKind::Appservice(server) => { | ||||
|                 let mut p = b"+".to_vec(); | ||||
|                 p.extend_from_slice(server.as_bytes()); | ||||
|                 p | ||||
|             } | ||||
|             OutgoingKind::Push(user, pushkey) => { | ||||
|                 let mut p = b"$".to_vec(); | ||||
|                 p.extend_from_slice(&user); | ||||
|                 p.push(0xff); | ||||
|                 p.extend_from_slice(&pushkey); | ||||
|                 p | ||||
|             } | ||||
|             OutgoingKind::Normal(server) => { | ||||
|                 let mut p = Vec::new(); | ||||
|                 p.extend_from_slice(server.as_bytes()); | ||||
|                 p | ||||
|             } | ||||
|         }; | ||||
|         prefix.push(0xff); | ||||
| 
 | ||||
|         prefix | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug, PartialEq, Eq, Hash)] | ||||
| pub enum SendingEventType { | ||||
|     Pdu(Vec<u8>), | ||||
|     Edu(Vec<u8>), | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Sending { | ||||
|     /// The state for a given state hash.
 | ||||
|     pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
 | ||||
|     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId
 | ||||
|     pub(super) servercurrentevents: sled::Tree, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / (*)EduEvent
 | ||||
|     pub(super) maximum_requests: Arc<Semaphore>, | ||||
| } | ||||
| 
 | ||||
|  | @ -45,7 +78,7 @@ enum TransactionStatus { | |||
| impl Sending { | ||||
|     pub fn start_handler(&self, db: &Database) { | ||||
|         let servernamepduids = self.servernamepduids.clone(); | ||||
|         let servercurrentpdus = self.servercurrentpdus.clone(); | ||||
|         let servercurrentevents = self.servercurrentevents.clone(); | ||||
| 
 | ||||
|         let db = db.clone(); | ||||
| 
 | ||||
|  | @ -56,14 +89,14 @@ impl Sending { | |||
|             let mut subscriber = servernamepduids.watch_prefix(b""); | ||||
|             let mut current_transaction_status = HashMap::<Vec<u8>, TransactionStatus>::new(); | ||||
| 
 | ||||
|             let mut initial_transactions = HashMap::<OutgoingKind, Vec<Vec<u8>>>::new(); | ||||
|             for (key, outgoing_kind, pdu) in servercurrentpdus | ||||
|             let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new(); | ||||
|             for (key, outgoing_kind, event) in servercurrentevents | ||||
|                 .iter() | ||||
|                 .filter_map(|r| r.ok()) | ||||
|                 .filter_map(|(key, _)| { | ||||
|                     Self::parse_servercurrentpdus(&key) | ||||
|                     Self::parse_servercurrentevent(&key) | ||||
|                         .ok() | ||||
|                         .map(|(k, p)| (key, k, p.to_vec())) | ||||
|                         .map(|(k, e)| (key, k, e)) | ||||
|                 }) | ||||
|             { | ||||
|                 let entry = initial_transactions | ||||
|  | @ -72,39 +105,20 @@ impl Sending { | |||
| 
 | ||||
|                 if entry.len() > 30 { | ||||
|                     warn!( | ||||
|                         "Dropping some current pdu: {:?} {:?} {:?}", | ||||
|                         key, outgoing_kind, pdu | ||||
|                         "Dropping some current events: {:?} {:?} {:?}", | ||||
|                         key, outgoing_kind, event | ||||
|                     ); | ||||
|                     servercurrentpdus.remove(key).unwrap(); | ||||
|                     servercurrentevents.remove(key).unwrap(); | ||||
|                     continue; | ||||
|                 } | ||||
| 
 | ||||
|                 entry.push(pdu); | ||||
|                 entry.push(event); | ||||
|             } | ||||
| 
 | ||||
|             for (outgoing_kind, pdus) in initial_transactions { | ||||
|                 let mut prefix = match &outgoing_kind { | ||||
|                     OutgoingKind::Appservice(server) => { | ||||
|                         let mut p = b"+".to_vec(); | ||||
|                         p.extend_from_slice(server.as_bytes()); | ||||
|                         p | ||||
|                     } | ||||
|                     OutgoingKind::Push(user, pushkey) => { | ||||
|                         let mut p = b"$".to_vec(); | ||||
|                         p.extend_from_slice(&user); | ||||
|                         p.push(0xff); | ||||
|                         p.extend_from_slice(&pushkey); | ||||
|                         p | ||||
|                     } | ||||
|                     OutgoingKind::Normal(server) => { | ||||
|                         let mut p = Vec::new(); | ||||
|                         p.extend_from_slice(server.as_bytes()); | ||||
|                         p | ||||
|                     } | ||||
|                 }; | ||||
|                 prefix.push(0xff); | ||||
|                 current_transaction_status.insert(prefix, TransactionStatus::Running); | ||||
|                 futures.push(Self::handle_event(outgoing_kind.clone(), pdus, &db)); | ||||
|             for (outgoing_kind, events) in initial_transactions { | ||||
|                 current_transaction_status | ||||
|                     .insert(outgoing_kind.get_prefix(), TransactionStatus::Running); | ||||
|                 futures.push(Self::handle_events(outgoing_kind.clone(), events, &db)); | ||||
|             } | ||||
| 
 | ||||
|             loop { | ||||
|  | @ -112,58 +126,46 @@ impl Sending { | |||
|                     Some(response) = futures.next() => { | ||||
|                         match response { | ||||
|                             Ok(outgoing_kind) => { | ||||
|                                 let mut prefix = match &outgoing_kind { | ||||
|                                     OutgoingKind::Appservice(server) => { | ||||
|                                         let mut p = b"+".to_vec(); | ||||
|                                         p.extend_from_slice(server.as_bytes()); | ||||
|                                         p | ||||
|                                     } | ||||
|                                     OutgoingKind::Push(user, pushkey) => { | ||||
|                                         let mut p = b"$".to_vec(); | ||||
|                                         p.extend_from_slice(&user); | ||||
|                                         p.push(0xff); | ||||
|                                         p.extend_from_slice(&pushkey); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                     OutgoingKind::Normal(server) => { | ||||
|                                         let mut p = vec![]; | ||||
|                                         p.extend_from_slice(server.as_bytes()); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                 }; | ||||
|                                 prefix.push(0xff); | ||||
| 
 | ||||
|                                 for key in servercurrentpdus | ||||
|                                 let prefix = outgoing_kind.get_prefix(); | ||||
|                                 for key in servercurrentevents | ||||
|                                     .scan_prefix(&prefix) | ||||
|                                     .keys() | ||||
|                                     .filter_map(|r| r.ok()) | ||||
|                                 { | ||||
|                                     servercurrentpdus.remove(key).unwrap(); | ||||
|                                     servercurrentevents.remove(key).unwrap(); | ||||
|                                 } | ||||
| 
 | ||||
|                                 // Find events that have been added since starting the last request
 | ||||
|                                 let new_pdus = servernamepduids | ||||
|                                 let new_events = servernamepduids | ||||
|                                     .scan_prefix(&prefix) | ||||
|                                     .keys() | ||||
|                                     .filter_map(|r| r.ok()) | ||||
|                                     .map(|k| { | ||||
|                                         k[prefix.len()..].to_vec() | ||||
|                                         SendingEventType::Pdu(k[prefix.len()..].to_vec()) | ||||
|                                     }) | ||||
|                                     .take(30) | ||||
|                                     .collect::<Vec<_>>(); | ||||
| 
 | ||||
|                                 if !new_pdus.is_empty() { | ||||
|                                     for pdu_id in &new_pdus { | ||||
|                                 // TODO: find edus
 | ||||
| 
 | ||||
|                                 if !new_events.is_empty() { | ||||
|                                     // Insert pdus we found
 | ||||
|                                     for event in &new_events { | ||||
|                                         let mut current_key = prefix.clone(); | ||||
|                                         current_key.extend_from_slice(pdu_id); | ||||
|                                         servercurrentpdus.insert(¤t_key, &[]).unwrap(); | ||||
|                                         servernamepduids.remove(¤t_key).unwrap(); | ||||
|                                         match event { | ||||
|                                             SendingEventType::Pdu(b) | | ||||
|                                             SendingEventType::Edu(b) => { | ||||
|                                                 current_key.extend_from_slice(&b); | ||||
|                                                 servercurrentevents.insert(¤t_key, &[]).unwrap(); | ||||
|                                                 servernamepduids.remove(¤t_key).unwrap(); | ||||
|                                              } | ||||
|                                         } | ||||
|                                     } | ||||
| 
 | ||||
|                                     futures.push( | ||||
|                                         Self::handle_event( | ||||
|                                         Self::handle_events( | ||||
|                                             outgoing_kind.clone(), | ||||
|                                             new_pdus, | ||||
|                                             new_events, | ||||
|                                             &db, | ||||
|                                         ) | ||||
|                                     ); | ||||
|  | @ -172,29 +174,7 @@ impl Sending { | |||
|                                 } | ||||
|                             } | ||||
|                             Err((outgoing_kind, _)) => { | ||||
|                                 let mut prefix = match &outgoing_kind { | ||||
|                                     OutgoingKind::Appservice(serv) => { | ||||
|                                         let mut p = b"+".to_vec(); | ||||
|                                         p.extend_from_slice(serv.as_bytes()); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                     OutgoingKind::Push(user, pushkey) => { | ||||
|                                         let mut p = b"$".to_vec(); | ||||
|                                         p.extend_from_slice(&user); | ||||
|                                         p.push(0xff); | ||||
|                                         p.extend_from_slice(&pushkey); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                     OutgoingKind::Normal(serv) => { | ||||
|                                         let mut p = vec![]; | ||||
|                                         p.extend_from_slice(serv.as_bytes()); | ||||
|                                         p | ||||
|                                     }, | ||||
|                                 }; | ||||
| 
 | ||||
|                                 prefix.push(0xff); | ||||
| 
 | ||||
|                                 current_transaction_status.entry(prefix).and_modify(|e| *e = match e { | ||||
|                                 current_transaction_status.entry(outgoing_kind.get_prefix()).and_modify(|e| *e = match e { | ||||
|                                     TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), | ||||
|                                     TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), | ||||
|                                     TransactionStatus::Failed(_, _) => { | ||||
|  | @ -206,97 +186,17 @@ impl Sending { | |||
|                         }; | ||||
|                     }, | ||||
|                     Some(event) = &mut subscriber => { | ||||
|                         if let sled::Event::Insert { key, .. } = event { | ||||
|                         // New sled version:
 | ||||
|                         //for (_tree, key, value_opt) in &event {
 | ||||
|                         //    if value_opt.is_none() {
 | ||||
|                         //        continue;
 | ||||
|                         //    }
 | ||||
| 
 | ||||
|                             let servernamepduid = key.clone(); | ||||
| 
 | ||||
|                             let mut retry = false; | ||||
| 
 | ||||
|                             if let Some((outgoing_kind, prefix, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid) | ||||
|                                 .ok() | ||||
|                                 .map(|(outgoing_kind, pdu_id)| { | ||||
|                                     let mut prefix = match &outgoing_kind { | ||||
|                                         OutgoingKind::Appservice(serv) => { | ||||
|                                             let mut p = b"+".to_vec(); | ||||
|                                             p.extend_from_slice(serv.as_bytes()); | ||||
|                                             p | ||||
|                                     }, | ||||
|                                         OutgoingKind::Push(user, pushkey) => { | ||||
|                                             let mut p = b"$".to_vec(); | ||||
|                                             p.extend_from_slice(&user); | ||||
|                                             p.push(0xff); | ||||
|                                             p.extend_from_slice(&pushkey); | ||||
|                                             p | ||||
|                                         }, | ||||
|                                         OutgoingKind::Normal(serv) => { | ||||
|                                             let mut p = vec![]; | ||||
|                                             p.extend_from_slice(serv.as_bytes()); | ||||
|                                             p | ||||
|                                         }, | ||||
|                                     }; | ||||
|                                     prefix.push(0xff); | ||||
| 
 | ||||
|                                     (outgoing_kind, prefix, pdu_id) | ||||
|                                 }) | ||||
|                                 .filter(|(_, prefix, _)| { | ||||
|                                     let entry = current_transaction_status.entry(prefix.clone()); | ||||
|                                     let mut allow = true; | ||||
| 
 | ||||
|                                     entry.and_modify(|e| match e { | ||||
|                                         TransactionStatus::Running | TransactionStatus::Retrying(_) => { | ||||
|                                             allow = false; // already running
 | ||||
|                                         }, | ||||
|                                         TransactionStatus::Failed(tries, time) => { | ||||
|                                             // Fail if a request has failed recently (exponential backoff)
 | ||||
|                                             let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); | ||||
|                                             if min_elapsed_duration > Duration::from_secs(60*60*24) { | ||||
|                                                 min_elapsed_duration = Duration::from_secs(60*60*24); | ||||
|                                             } | ||||
| 
 | ||||
|                                             if time.elapsed() < min_elapsed_duration { | ||||
|                                                 allow = false; | ||||
|                                             } else { | ||||
|                                                 retry = true; | ||||
|                                                 *e = TransactionStatus::Retrying(*tries); | ||||
|                                             } | ||||
|                                         } | ||||
|                                     }).or_insert(TransactionStatus::Running); | ||||
| 
 | ||||
|                                     allow | ||||
|                                 }) | ||||
|                             { | ||||
|                                 let mut pdus = Vec::new(); | ||||
| 
 | ||||
|                                 if retry { | ||||
|                                     // We retry the previous transaction
 | ||||
|                                     for pdu in servercurrentpdus | ||||
|                                         .scan_prefix(&prefix) | ||||
|                                         .filter_map(|r| r.ok()) | ||||
|                                         .filter_map(|(key, _)| { | ||||
|                                             Self::parse_servercurrentpdus(&key) | ||||
|                                                 .ok() | ||||
|                                                 .map(|(_, p)| p.to_vec()) | ||||
|                                         }) | ||||
|                                     { | ||||
|                                         pdus.push(pdu); | ||||
|                                     } | ||||
|                                 } else { | ||||
|                                     servercurrentpdus.insert(&key, &[]).unwrap(); | ||||
|                                     servernamepduids.remove(&key).unwrap(); | ||||
|                                     pdus.push(pdu_id.to_vec()); | ||||
|                         if let sled::Event::Insert { key, .. } = event { | ||||
|                             if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key) { | ||||
|                                 if let Some(events) = Self::select_events(&outgoing_kind, vec![(event, key)], &mut current_transaction_status, &servercurrentevents, &servernamepduids) { | ||||
|                                     futures.push(Self::handle_events(outgoing_kind, events, &db)); | ||||
|                                 } | ||||
|                                 futures.push( | ||||
|                                     Self::handle_event( | ||||
|                                         outgoing_kind, | ||||
|                                         pdus, | ||||
|                                         &db, | ||||
|                                     ) | ||||
|                                 ); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|  | @ -305,6 +205,73 @@ impl Sending { | |||
|         }); | ||||
|     } | ||||
| 
 | ||||
|     fn select_events( | ||||
|         outgoing_kind: &OutgoingKind, | ||||
|         new_events: Vec<(SendingEventType, IVec)>, // Events we want to send: event and full key
 | ||||
|         current_transaction_status: &mut HashMap<Vec<u8>, TransactionStatus>, | ||||
|         servercurrentevents: &sled::Tree, | ||||
|         servernamepduids: &sled::Tree, | ||||
|     ) -> Option<Vec<SendingEventType>> { | ||||
|         let mut retry = false; | ||||
|         let mut allow = true; | ||||
| 
 | ||||
|         let prefix = outgoing_kind.get_prefix(); | ||||
|         let entry = current_transaction_status.entry(prefix.clone()); | ||||
| 
 | ||||
|         entry | ||||
|             .and_modify(|e| match e { | ||||
|                 TransactionStatus::Running | TransactionStatus::Retrying(_) => { | ||||
|                     allow = false; // already running
 | ||||
|                 } | ||||
|                 TransactionStatus::Failed(tries, time) => { | ||||
|                     // Fail if a request has failed recently (exponential backoff)
 | ||||
|                     let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); | ||||
|                     if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) { | ||||
|                         min_elapsed_duration = Duration::from_secs(60 * 60 * 24); | ||||
|                     } | ||||
| 
 | ||||
|                     if time.elapsed() < min_elapsed_duration { | ||||
|                         allow = false; | ||||
|                     } else { | ||||
|                         retry = true; | ||||
|                         *e = TransactionStatus::Retrying(*tries); | ||||
|                     } | ||||
|                 } | ||||
|             }) | ||||
|             .or_insert(TransactionStatus::Running); | ||||
| 
 | ||||
|         if !allow { | ||||
|             return None; | ||||
|         } | ||||
| 
 | ||||
|         let mut events = Vec::new(); | ||||
| 
 | ||||
|         if retry { | ||||
|             // We retry the previous transaction
 | ||||
|             for key in servercurrentevents | ||||
|                 .scan_prefix(&prefix) | ||||
|                 .keys() | ||||
|                 .filter_map(|r| r.ok()) | ||||
|             { | ||||
|                 if let Ok((_, e)) = Self::parse_servercurrentevent(&key) { | ||||
|                     events.push(e); | ||||
|                 } | ||||
|             } | ||||
|         } else { | ||||
|             for (e, full_key) in new_events { | ||||
|                 servercurrentevents.insert(&full_key, &[]).unwrap(); | ||||
| 
 | ||||
|                 // If it was a PDU we have to unqueue it
 | ||||
|                 // TODO: don't try to unqueue EDUs
 | ||||
|                 servernamepduids.remove(&full_key).unwrap(); | ||||
| 
 | ||||
|                 events.push(e); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Some(events) | ||||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip(self))] | ||||
|     pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: IVec) -> Result<()> { | ||||
|         let mut key = b"$".to_vec(); | ||||
|  | @ -338,7 +305,7 @@ impl Sending { | |||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument] | ||||
|     fn calculate_hash(keys: &[Vec<u8>]) -> Vec<u8> { | ||||
|     fn calculate_hash(keys: &[&[u8]]) -> Vec<u8> { | ||||
|         // We only hash the pdu's event ids, not the whole pdu
 | ||||
|         let bytes = keys.join(&0xff); | ||||
|         let hash = digest::digest(&digest::SHA256, &bytes); | ||||
|  | @ -346,33 +313,37 @@ impl Sending { | |||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip(db))] | ||||
|     async fn handle_event( | ||||
|     async fn handle_events( | ||||
|         kind: OutgoingKind, | ||||
|         pdu_ids: Vec<Vec<u8>>, | ||||
|         events: Vec<SendingEventType>, | ||||
|         db: &Database, | ||||
|     ) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> { | ||||
|         match &kind { | ||||
|             OutgoingKind::Appservice(server) => { | ||||
|                 let pdu_jsons = pdu_ids | ||||
|                     .iter() | ||||
|                     .map(|pdu_id| { | ||||
|                         Ok::<_, (Box<ServerName>, Error)>( | ||||
|                             db.rooms | ||||
|                                 .get_pdu_from_id(pdu_id) | ||||
|                                 .map_err(|e| (server.clone(), e))? | ||||
|                 let mut pdu_jsons = Vec::new(); | ||||
| 
 | ||||
|                 for event in &events { | ||||
|                     match event { | ||||
|                         SendingEventType::Pdu(pdu_id) => { | ||||
|                             pdu_jsons.push(db.rooms | ||||
|                                 .get_pdu_from_id(&pdu_id) | ||||
|                                 .map_err(|e| (kind.clone(), e))? | ||||
|                                 .ok_or_else(|| { | ||||
|                                     ( | ||||
|                                         server.clone(), | ||||
|                                         kind.clone(), | ||||
|                                         Error::bad_database( | ||||
|                                             "[Appservice] Event in servernamepduids not found in db.", | ||||
|                                         ), | ||||
|                                     ) | ||||
|                                 })? | ||||
|                                 .to_any_event(), | ||||
|                         ) | ||||
|                     }) | ||||
|                     .filter_map(|r| r.ok()) | ||||
|                     .collect::<Vec<_>>(); | ||||
|                                 .to_any_event()) | ||||
|                         } | ||||
|                         SendingEventType::Edu(_) => { | ||||
|                             // Appservices don't need EDUs (?)
 | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 let permit = db.sending.maximum_requests.acquire().await; | ||||
| 
 | ||||
|                 let response = appservice_server::send_request( | ||||
|  | @ -384,7 +355,14 @@ impl Sending { | |||
|                     appservice::event::push_events::v1::Request { | ||||
|                         events: &pdu_jsons, | ||||
|                         txn_id: &base64::encode_config( | ||||
|                             Self::calculate_hash(&pdu_ids), | ||||
|                             Self::calculate_hash( | ||||
|                                 &events | ||||
|                                     .iter() | ||||
|                                     .map(|e| match e { | ||||
|                                         SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, | ||||
|                                     }) | ||||
|                                     .collect::<Vec<_>>(), | ||||
|                             ), | ||||
|                             base64::URL_SAFE_NO_PAD, | ||||
|                         ), | ||||
|                     }, | ||||
|  | @ -398,25 +376,30 @@ impl Sending { | |||
|                 response | ||||
|             } | ||||
|             OutgoingKind::Push(user, pushkey) => { | ||||
|                 let pdus = pdu_ids | ||||
|                     .iter() | ||||
|                     .map(|pdu_id| { | ||||
|                         Ok::<_, (Vec<u8>, Error)>( | ||||
|                             db.rooms | ||||
|                                 .get_pdu_from_id(pdu_id) | ||||
|                                 .map_err(|e| (pushkey.clone(), e))? | ||||
|                                 .ok_or_else(|| { | ||||
|                                     ( | ||||
|                                         pushkey.clone(), | ||||
|                                         Error::bad_database( | ||||
|                                             "[Push] Event in servernamepduids not found in db.", | ||||
|                                         ), | ||||
|                                     ) | ||||
|                                 })?, | ||||
|                         ) | ||||
|                     }) | ||||
|                     .filter_map(|r| r.ok()) | ||||
|                     .collect::<Vec<_>>(); | ||||
|                 let mut pdus = Vec::new(); | ||||
| 
 | ||||
|                 for event in &events { | ||||
|                     match event { | ||||
|                         SendingEventType::Pdu(pdu_id) => { | ||||
|                             pdus.push( | ||||
|                                 db.rooms | ||||
|                                     .get_pdu_from_id(&pdu_id) | ||||
|                                     .map_err(|e| (kind.clone(), e))? | ||||
|                                     .ok_or_else(|| { | ||||
|                                         ( | ||||
|                                             kind.clone(), | ||||
|                                             Error::bad_database( | ||||
|                                                 "[Push] Event in servernamepduids not found in db.", | ||||
|                                             ), | ||||
|                                         ) | ||||
|                                     })?, | ||||
|                             ); | ||||
|                         } | ||||
|                         SendingEventType::Edu(_) => { | ||||
|                             // Push gateways don't need EDUs (?)
 | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 for pdu in pdus { | ||||
|                     // Redacted events are not notification targets (we don't send push for them)
 | ||||
|  | @ -427,13 +410,13 @@ impl Sending { | |||
|                     let userid = | ||||
|                         UserId::try_from(utils::string_from_bytes(user).map_err(|_| { | ||||
|                             ( | ||||
|                                 OutgoingKind::Push(user.clone(), pushkey.clone()), | ||||
|                                 kind.clone(), | ||||
|                                 Error::bad_database("Invalid push user string in db."), | ||||
|                             ) | ||||
|                         })?) | ||||
|                         .map_err(|_| { | ||||
|                             ( | ||||
|                                 OutgoingKind::Push(user.clone(), pushkey.clone()), | ||||
|                                 kind.clone(), | ||||
|                                 Error::bad_database("Invalid push user id in db."), | ||||
|                             ) | ||||
|                         })?; | ||||
|  | @ -484,15 +467,17 @@ impl Sending { | |||
|                 Ok(OutgoingKind::Push(user.clone(), pushkey.clone())) | ||||
|             } | ||||
|             OutgoingKind::Normal(server) => { | ||||
|                 let pdu_jsons = pdu_ids | ||||
|                     .iter() | ||||
|                     .map(|pdu_id| { | ||||
|                         Ok::<_, (OutgoingKind, Error)>( | ||||
|                 let mut edu_jsons = Vec::new(); | ||||
|                 let mut pdu_jsons = Vec::new(); | ||||
| 
 | ||||
|                 for event in &events { | ||||
|                     match event { | ||||
|                         SendingEventType::Pdu(pdu_id) => { | ||||
|                             // TODO: check room version and remove event_id if needed
 | ||||
|                             serde_json::from_str( | ||||
|                             pdu_jsons.push(serde_json::from_str( | ||||
|                                 PduEvent::convert_to_outgoing_federation_event( | ||||
|                                     db.rooms | ||||
|                                         .get_pdu_json_from_id(pdu_id) | ||||
|                                         .get_pdu_json_from_id(&pdu_id) | ||||
|                                         .map_err(|e| (OutgoingKind::Normal(server.clone()), e))? | ||||
|                                         .ok_or_else(|| { | ||||
|                                             ( | ||||
|  | @ -506,11 +491,15 @@ impl Sending { | |||
|                                 .json() | ||||
|                                 .get(), | ||||
|                             ) | ||||
|                             .expect("Raw<..> is always valid"), | ||||
|                         ) | ||||
|                     }) | ||||
|                     .filter_map(|r| r.ok()) | ||||
|                     .collect::<Vec<_>>(); | ||||
|                             .expect("Raw<..> is always valid")); | ||||
|                         } | ||||
|                         SendingEventType::Edu(edu) => { | ||||
|                             edu_jsons.push( | ||||
|                                 serde_json::from_slice(edu).expect("Raw<..> is always valid"), | ||||
|                             ); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 let permit = db.sending.maximum_requests.acquire().await; | ||||
| 
 | ||||
|  | @ -520,10 +509,17 @@ impl Sending { | |||
|                     send_transaction_message::v1::Request { | ||||
|                         origin: db.globals.server_name(), | ||||
|                         pdus: &pdu_jsons, | ||||
|                         edus: &[], | ||||
|                         edus: &edu_jsons, | ||||
|                         origin_server_ts: SystemTime::now(), | ||||
|                         transaction_id: &base64::encode_config( | ||||
|                             Self::calculate_hash(&pdu_ids), | ||||
|                             Self::calculate_hash( | ||||
|                                 &events | ||||
|                                     .iter() | ||||
|                                     .map(|e| match e { | ||||
|                                         SendingEventType::Edu(b) | SendingEventType::Pdu(b) => &**b, | ||||
|                                     }) | ||||
|                                     .collect::<Vec<_>>(), | ||||
|                             ), | ||||
|                             base64::URL_SAFE_NO_PAD, | ||||
|                         ), | ||||
|                     }, | ||||
|  | @ -546,13 +542,13 @@ impl Sending { | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn parse_servercurrentpdus(key: &IVec) -> Result<(OutgoingKind, IVec)> { | ||||
|     fn parse_servercurrentevent(key: &IVec) -> Result<(OutgoingKind, SendingEventType)> { | ||||
|         // Appservices start with a plus
 | ||||
|         Ok::<_, Error>(if key.starts_with(b"+") { | ||||
|             let mut parts = key[1..].splitn(2, |&b| b == 0xff); | ||||
| 
 | ||||
|             let server = parts.next().expect("splitn always returns one element"); | ||||
|             let pdu = parts | ||||
|             let event = parts | ||||
|                 .next() | ||||
|                 .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; | ||||
|             let server = utils::string_from_bytes(&server).map_err(|_| { | ||||
|  | @ -563,7 +559,11 @@ impl Sending { | |||
|                 OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| { | ||||
|                     Error::bad_database("Invalid server string in server_currenttransaction") | ||||
|                 })?), | ||||
|                 IVec::from(pdu), | ||||
|                 if event.starts_with(b"*") { | ||||
|                     SendingEventType::Edu(event[1..].to_vec()) | ||||
|                 } else { | ||||
|                     SendingEventType::Pdu(event.to_vec()) | ||||
|                 }, | ||||
|             ) | ||||
|         } else if key.starts_with(b"$") { | ||||
|             let mut parts = key[1..].splitn(3, |&b| b == 0xff); | ||||
|  | @ -572,18 +572,22 @@ impl Sending { | |||
|             let pushkey = parts | ||||
|                 .next() | ||||
|                 .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; | ||||
|             let pdu = parts | ||||
|             let event = parts | ||||
|                 .next() | ||||
|                 .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; | ||||
|             ( | ||||
|                 OutgoingKind::Push(user.to_vec(), pushkey.to_vec()), | ||||
|                 IVec::from(pdu), | ||||
|                 if event.starts_with(b"*") { | ||||
|                     SendingEventType::Edu(event[1..].to_vec()) | ||||
|                 } else { | ||||
|                     SendingEventType::Pdu(event.to_vec()) | ||||
|                 }, | ||||
|             ) | ||||
|         } else { | ||||
|             let mut parts = key.splitn(2, |&b| b == 0xff); | ||||
| 
 | ||||
|             let server = parts.next().expect("splitn always returns one element"); | ||||
|             let pdu = parts | ||||
|             let event = parts | ||||
|                 .next() | ||||
|                 .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; | ||||
|             let server = utils::string_from_bytes(&server).map_err(|_| { | ||||
|  | @ -594,7 +598,11 @@ impl Sending { | |||
|                 OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| { | ||||
|                     Error::bad_database("Invalid server string in server_currenttransaction") | ||||
|                 })?), | ||||
|                 IVec::from(pdu), | ||||
|                 if event.starts_with(b"*") { | ||||
|                     SendingEventType::Edu(event[1..].to_vec()) | ||||
|                 } else { | ||||
|                     SendingEventType::Pdu(event.to_vec()) | ||||
|                 }, | ||||
|             ) | ||||
|         }) | ||||
|     } | ||||
|  |  | |||
|  | @ -1,4 +1,4 @@ | |||
| use log::error; | ||||
| use log::{error, warn}; | ||||
| use ruma::api::client::{error::ErrorKind, r0::uiaa::UiaaInfo}; | ||||
| use thiserror::Error; | ||||
| 
 | ||||
|  | @ -92,6 +92,8 @@ where | |||
|             _ => (Unknown, StatusCode::INTERNAL_SERVER_ERROR), | ||||
|         }; | ||||
| 
 | ||||
|         warn!("{}: {}", status_code, message); | ||||
| 
 | ||||
|         RumaResponse::from(RumaError { | ||||
|             kind, | ||||
|             message, | ||||
|  |  | |||
|  | @ -205,13 +205,16 @@ async fn main() { | |||
| 
 | ||||
|         let root = span!(tracing::Level::INFO, "app_start", work_units = 2); | ||||
|         let _enter = root.enter(); | ||||
| 
 | ||||
|         let rocket = setup_rocket(raw_config, db); | ||||
|         rocket.launch().await.unwrap(); | ||||
|     } else { | ||||
|         std::env::set_var("CONDUIT_LOG", config.log); | ||||
|         pretty_env_logger::init_custom_env("CONDUIT_LOG"); | ||||
|     } | ||||
| 
 | ||||
|     let rocket = setup_rocket(raw_config, db); | ||||
|     rocket.launch().await.unwrap(); | ||||
|         let rocket = setup_rocket(raw_config, db); | ||||
|         rocket.launch().await.unwrap(); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[catch(404)] | ||||
|  |  | |||
|  | @ -27,11 +27,12 @@ use ruma::{ | |||
|     }, | ||||
|     directory::{IncomingFilter, IncomingRoomNetwork}, | ||||
|     events::{ | ||||
|         receipt::{ReceiptEvent, ReceiptEventContent}, | ||||
|         room::{ | ||||
|             create::CreateEventContent, | ||||
|             member::{MemberEventContent, MembershipState}, | ||||
|         }, | ||||
|         EventType, | ||||
|         AnyEphemeralRoomEvent, AnyEvent as EduEvent, EventType, | ||||
|     }, | ||||
|     serde::Raw, | ||||
|     signatures::{CanonicalJsonObject, CanonicalJsonValue}, | ||||
|  | @ -585,35 +586,6 @@ pub async fn send_transaction_message_route<'a>( | |||
|         return Err(Error::bad_config("Federation is disabled.")); | ||||
|     } | ||||
| 
 | ||||
|     for edu in body | ||||
|         .edus | ||||
|         .iter() | ||||
|         .map(|edu| serde_json::from_str::<Edu>(edu.json().get())) | ||||
|         .filter_map(|r| r.ok()) | ||||
|     { | ||||
|         match edu { | ||||
|             Edu::Presence(_) => {} | ||||
|             Edu::Receipt(_) => {} | ||||
|             Edu::Typing(typing) => { | ||||
|                 if typing.typing { | ||||
|                     db.rooms.edus.typing_add( | ||||
|                         &typing.user_id, | ||||
|                         &typing.room_id, | ||||
|                         3000 + utils::millis_since_unix_epoch(), | ||||
|                         &db.globals, | ||||
|                     )?; | ||||
|                 } else { | ||||
|                     db.rooms | ||||
|                         .edus | ||||
|                         .typing_remove(&typing.user_id, &typing.room_id, &db.globals)?; | ||||
|                 } | ||||
|             } | ||||
|             Edu::DeviceListUpdate(_) => {} | ||||
|             Edu::DirectToDevice(_) => {} | ||||
|             Edu::_Custom(_) => {} | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let mut resolved_map = BTreeMap::new(); | ||||
| 
 | ||||
|     let pub_key_map = RwLock::new(BTreeMap::new()); | ||||
|  | @ -659,6 +631,73 @@ pub async fn send_transaction_message_route<'a>( | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     for edu in body | ||||
|         .edus | ||||
|         .iter() | ||||
|         .map(|edu| serde_json::from_str::<Edu>(edu.json().get())) | ||||
|         .filter_map(|r| r.ok()) | ||||
|     { | ||||
|         match edu { | ||||
|             Edu::Presence(_) => {} | ||||
|             Edu::Receipt(receipt) => { | ||||
|                 for (room_id, room_updates) in receipt.receipts { | ||||
|                     for (user_id, user_updates) in room_updates.read { | ||||
|                         if let Some((event_id, _)) = user_updates | ||||
|                             .event_ids | ||||
|                             .iter() | ||||
|                             .filter_map(|id| { | ||||
|                                 db.rooms.get_pdu_count(&id).ok().flatten().map(|r| (id, r)) | ||||
|                             }) | ||||
|                             .max_by_key(|(_, count)| *count) | ||||
|                         { | ||||
|                             let mut user_receipts = BTreeMap::new(); | ||||
|                             user_receipts.insert(user_id.clone(), user_updates.data); | ||||
| 
 | ||||
|                             let mut receipt_content = BTreeMap::new(); | ||||
|                             receipt_content.insert( | ||||
|                                 event_id.to_owned(), | ||||
|                                 ruma::events::receipt::Receipts { | ||||
|                                     read: Some(user_receipts), | ||||
|                                 }, | ||||
|                             ); | ||||
| 
 | ||||
|                             let event = | ||||
|                                 EduEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt(ReceiptEvent { | ||||
|                                     content: ReceiptEventContent(receipt_content), | ||||
|                                     room_id: room_id.clone(), | ||||
|                                 })); | ||||
|                             db.rooms.edus.readreceipt_update( | ||||
|                                 &user_id, | ||||
|                                 &room_id, | ||||
|                                 event, | ||||
|                                 &db.globals, | ||||
|                             )?; | ||||
|                         } else { | ||||
|                             warn!("No known event ids in read receipt: {:?}", user_updates); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             Edu::Typing(typing) => { | ||||
|                 if typing.typing { | ||||
|                     db.rooms.edus.typing_add( | ||||
|                         &typing.user_id, | ||||
|                         &typing.room_id, | ||||
|                         3000 + utils::millis_since_unix_epoch(), | ||||
|                         &db.globals, | ||||
|                     )?; | ||||
|                 } else { | ||||
|                     db.rooms | ||||
|                         .edus | ||||
|                         .typing_remove(&typing.user_id, &typing.room_id, &db.globals)?; | ||||
|                 } | ||||
|             } | ||||
|             Edu::DeviceListUpdate(_) => {} | ||||
|             Edu::DirectToDevice(_) => {} | ||||
|             Edu::_Custom(_) => {} | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into()) | ||||
| } | ||||
| 
 | ||||
|  | @ -1134,7 +1173,7 @@ pub fn handle_incoming_pdu<'a>( | |||
|                     .await | ||||
|                     { | ||||
|                         // This should always contain exactly one element when Ok
 | ||||
|                         Ok(events) => state_auth.push(events[0].clone()), | ||||
|                         Ok(events) => state_auth.extend_from_slice(&events), | ||||
|                         Err(e) => { | ||||
|                             debug!("Event was not present: {}", e); | ||||
|                         } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue