improvement: get_missing_events route and cleanup
This commit is contained in:
		
							parent
							
								
									ab332363ce
								
							
						
					
					
						commit
						0d6159c2da
					
				
					 5 changed files with 95 additions and 80 deletions
				
			
		
							
								
								
									
										32
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										32
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							|  | @ -627,9 +627,9 @@ dependencies = [ | |||
| 
 | ||||
| [[package]] | ||||
| name = "hermit-abi" | ||||
| version = "0.1.15" | ||||
| version = "0.1.16" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "3deed196b6e7f9e44a2ae8d94225d80302d81208b1bb673fd21fe634645c85a9" | ||||
| checksum = "4c30f6d0bc6b00693347368a67d41b58f2fb851215ff1da49e90fe2c5c667151" | ||||
| dependencies = [ | ||||
|  "libc", | ||||
| ] | ||||
|  | @ -1547,7 +1547,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma" | ||||
| version = "0.0.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "ruma-api", | ||||
|  "ruma-appservice-api", | ||||
|  | @ -1563,7 +1563,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-api" | ||||
| version = "0.17.0-alpha.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "http", | ||||
|  "percent-encoding", | ||||
|  | @ -1578,7 +1578,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-api-macros" | ||||
| version = "0.17.0-alpha.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "proc-macro-crate", | ||||
|  "proc-macro2", | ||||
|  | @ -1589,7 +1589,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-appservice-api" | ||||
| version = "0.2.0-alpha.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "ruma-api", | ||||
|  "ruma-common", | ||||
|  | @ -1602,7 +1602,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-client-api" | ||||
| version = "0.10.0-alpha.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "assign", | ||||
|  "http", | ||||
|  | @ -1621,7 +1621,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-common" | ||||
| version = "0.2.0" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "js_int", | ||||
|  "ruma-api", | ||||
|  | @ -1635,7 +1635,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-events" | ||||
| version = "0.22.0-alpha.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "js_int", | ||||
|  "ruma-common", | ||||
|  | @ -1650,7 +1650,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-events-macros" | ||||
| version = "0.22.0-alpha.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "proc-macro-crate", | ||||
|  "proc-macro2", | ||||
|  | @ -1661,7 +1661,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-federation-api" | ||||
| version = "0.0.3" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "js_int", | ||||
|  "ruma-api", | ||||
|  | @ -1676,7 +1676,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-identifiers" | ||||
| version = "0.17.4" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "rand", | ||||
|  "ruma-identifiers-macros", | ||||
|  | @ -1688,7 +1688,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-identifiers-macros" | ||||
| version = "0.17.4" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "proc-macro2", | ||||
|  "quote", | ||||
|  | @ -1699,7 +1699,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-identifiers-validation" | ||||
| version = "0.1.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "serde", | ||||
|  "strum", | ||||
|  | @ -1708,7 +1708,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-serde" | ||||
| version = "0.2.3" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "form_urlencoded", | ||||
|  "itoa", | ||||
|  | @ -1720,7 +1720,7 @@ dependencies = [ | |||
| [[package]] | ||||
| name = "ruma-signatures" | ||||
| version = "0.6.0-dev.1" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#a6486e7a00183b4578650528d65e83318da53b23" | ||||
| source = "git+https://github.com/timokoesters/ruma?branch=timo-fed-fixes#47fab87325b71b7f6c2fb3cd276d1f813e42abf7" | ||||
| dependencies = [ | ||||
|  "base64", | ||||
|  "ring", | ||||
|  |  | |||
|  | @ -1,10 +1,10 @@ | |||
| use std::{collections::HashSet, convert::TryFrom, time::SystemTime}; | ||||
| 
 | ||||
| use crate::{server_server, utils, Error, Result}; | ||||
| use crate::{server_server, utils, Error, PduEvent, Result}; | ||||
| use federation::transactions::send_transaction_message; | ||||
| use log::warn; | ||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||
| use ruma::{api::federation, Raw, ServerName}; | ||||
| use ruma::{api::federation, ServerName}; | ||||
| use sled::IVec; | ||||
| use tokio::select; | ||||
| 
 | ||||
|  | @ -83,49 +83,27 @@ impl Sending { | |||
|         (Box<ServerName>, send_transaction_message::v1::Response), | ||||
|         (Box<ServerName>, Error), | ||||
|     > { | ||||
|         let mut pdu_json = rooms | ||||
|             .get_pdu_json_from_id(&pdu_id) | ||||
|             .map_err(|e| (server.clone(), e))? | ||||
|             .ok_or_else(|| { | ||||
|                 ( | ||||
|                     server.clone(), | ||||
|                     Error::bad_database("Event in serverpduids not found in db."), | ||||
|                 ) | ||||
|             })?; | ||||
| 
 | ||||
|         if let Some(unsigned) = pdu_json | ||||
|             .as_object_mut() | ||||
|             .expect("json is object") | ||||
|             .get_mut("unsigned") | ||||
|         { | ||||
|             unsigned | ||||
|                 .as_object_mut() | ||||
|                 .expect("unsigned is object") | ||||
|                 .remove("transaction_id"); | ||||
|         } | ||||
| 
 | ||||
|         pdu_json | ||||
|             .as_object_mut() | ||||
|             .expect("json is object") | ||||
|             .remove("event_id"); | ||||
| 
 | ||||
|         let raw_json = | ||||
|             serde_json::from_value::<Raw<_>>(pdu_json).expect("Raw::from_value always works"); | ||||
| 
 | ||||
|         let globals = &globals; | ||||
| 
 | ||||
|         let pdus = vec![raw_json]; | ||||
|         let transaction_id = utils::random_string(16); | ||||
|         let pdu_json = PduEvent::to_outgoing_federation_event( | ||||
|             rooms | ||||
|                 .get_pdu_json_from_id(&pdu_id) | ||||
|                 .map_err(|e| (server.clone(), e))? | ||||
|                 .ok_or_else(|| { | ||||
|                     ( | ||||
|                         server.clone(), | ||||
|                         Error::bad_database("Event in serverpduids not found in db."), | ||||
|                     ) | ||||
|                 })?, | ||||
|         ); | ||||
| 
 | ||||
|         server_server::send_request( | ||||
|             &globals, | ||||
|             server.clone(), | ||||
|             send_transaction_message::v1::Request { | ||||
|                 origin: globals.server_name(), | ||||
|                 pdus: &pdus, | ||||
|                 pdus: &[pdu_json], | ||||
|                 edus: &[], | ||||
|                 origin_server_ts: SystemTime::now(), | ||||
|                 transaction_id: &transaction_id, | ||||
|                 transaction_id: &utils::random_string(16), | ||||
|             }, | ||||
|         ) | ||||
|         .await | ||||
|  |  | |||
|  | @ -125,6 +125,7 @@ fn setup_rocket() -> rocket::Rocket { | |||
|                 server_server::get_public_rooms_route, | ||||
|                 server_server::get_public_rooms_filtered_route, | ||||
|                 server_server::send_transaction_message_route, | ||||
|                 server_server::get_missing_events_route, | ||||
|             ], | ||||
|         ) | ||||
|         .attach(AdHoc::on_attach("Config", |mut rocket| async { | ||||
|  |  | |||
							
								
								
									
										43
									
								
								src/pdu.rs
									
									
									
									
									
								
							
							
						
						
									
										43
									
								
								src/pdu.rs
									
									
									
									
									
								
							|  | @ -1,7 +1,6 @@ | |||
| use crate::Error; | ||||
| use js_int::UInt; | ||||
| use ruma::{ | ||||
|     events::pdu::PduStub, | ||||
|     events::{ | ||||
|         pdu::EventHash, room::member::MemberEventContent, AnyEvent, AnyRoomEvent, AnyStateEvent, | ||||
|         AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, StateEvent, | ||||
|  | @ -200,32 +199,26 @@ impl PduEvent { | |||
|         serde_json::from_value(json).expect("Raw::from_value always works") | ||||
|     } | ||||
| 
 | ||||
|     pub fn to_outgoing_federation_event(&self) -> Raw<PduStub> { | ||||
|         let mut unsigned = self.unsigned.clone(); | ||||
|         unsigned.remove("transaction_id"); | ||||
| 
 | ||||
|         let mut json = json!({ | ||||
|             "room_id": self.room_id, | ||||
|             "sender": self.sender, | ||||
|             "origin_server_ts": self.origin_server_ts, | ||||
|             "type": self.kind, | ||||
|             "content": self.content, | ||||
|             "prev_events": self.prev_events, | ||||
|             "depth": self.depth, | ||||
|             "auth_events": self.auth_events, | ||||
|             "unsigned": unsigned, | ||||
|             "hashes": self.hashes, | ||||
|             "signatures": self.signatures, | ||||
|         }); | ||||
| 
 | ||||
|         if let Some(state_key) = &self.state_key { | ||||
|             json["state_key"] = json!(state_key); | ||||
|         } | ||||
|         if let Some(redacts) = &self.redacts { | ||||
|             json["redacts"] = json!(redacts); | ||||
|     pub fn to_outgoing_federation_event( | ||||
|         mut pdu_json: serde_json::Value, | ||||
|     ) -> Raw<ruma::events::pdu::PduStub> { | ||||
|         if let Some(unsigned) = pdu_json | ||||
|             .as_object_mut() | ||||
|             .expect("json is object") | ||||
|             .get_mut("unsigned") | ||||
|         { | ||||
|             unsigned | ||||
|                 .as_object_mut() | ||||
|                 .expect("unsigned is object") | ||||
|                 .remove("transaction_id"); | ||||
|         } | ||||
| 
 | ||||
|         serde_json::from_value(json).expect("Raw::from_value always works") | ||||
|         pdu_json | ||||
|             .as_object_mut() | ||||
|             .expect("json is object") | ||||
|             .remove("event_id"); | ||||
| 
 | ||||
|         serde_json::from_value::<Raw<_>>(pdu_json).expect("Raw::from_value always works") | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -3,13 +3,13 @@ use http::header::{HeaderValue, AUTHORIZATION, HOST}; | |||
| use log::warn; | ||||
| use rocket::{get, post, put, response::content::Json, State}; | ||||
| use ruma::{ | ||||
|     api::federation::directory::get_public_rooms_filtered, | ||||
|     api::{ | ||||
|         federation::{ | ||||
|             directory::get_public_rooms, | ||||
|             directory::{get_public_rooms, get_public_rooms_filtered}, | ||||
|             discovery::{ | ||||
|                 get_server_keys, get_server_version::v1 as get_server_version, ServerKey, VerifyKey, | ||||
|             }, | ||||
|             event::get_missing_events, | ||||
|             transactions::send_transaction_message, | ||||
|         }, | ||||
|         OutgoingRequest, | ||||
|  | @ -373,3 +373,46 @@ pub fn send_transaction_message_route<'a>( | |||
|     } | ||||
|     .into()) | ||||
| } | ||||
| 
 | ||||
| #[cfg_attr(
 | ||||
|     feature = "conduit_bin", | ||||
|     post("/_matrix/federation/v1/get_missing_events/<_>", data = "<body>") | ||||
| )] | ||||
| pub fn get_missing_events_route<'a>( | ||||
|     db: State<'a, Database>, | ||||
|     body: Ruma<get_missing_events::v1::Request<'_>>, | ||||
| ) -> ConduitResult<get_missing_events::v1::Response> { | ||||
|     let mut queued_events = body.latest_events.clone(); | ||||
|     let mut events = Vec::new(); | ||||
| 
 | ||||
|     let mut i = 0; | ||||
|     while i < queued_events.len() && events.len() < u64::from(body.limit) as usize { | ||||
|         if let Some(pdu) = db.rooms.get_pdu_json(&queued_events[i])? { | ||||
|             if body.earliest_events.contains( | ||||
|                 &serde_json::from_value( | ||||
|                     pdu.get("event_id") | ||||
|                         .cloned() | ||||
|                         .ok_or_else(|| Error::bad_database("Event in db has no event_id field."))?, | ||||
|                 ) | ||||
|                 .map_err(|_| Error::bad_database("Invalid event_id field in pdu in db."))?, | ||||
|             ) { | ||||
|                 i += 1; | ||||
|                 continue; | ||||
|             } | ||||
|             queued_events.extend_from_slice( | ||||
|                 &serde_json::from_value::<Vec<EventId>>( | ||||
|                     pdu.get("prev_events").cloned().ok_or_else(|| { | ||||
|                         Error::bad_database("Invalid prev_events field of pdu in db.") | ||||
|                     })?, | ||||
|                 ) | ||||
|                 .map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?, | ||||
|             ); | ||||
|             events.push(PduEvent::to_outgoing_federation_event(pdu)); | ||||
|         } | ||||
|         i += 1; | ||||
|     } | ||||
| 
 | ||||
|     dbg!(&events); | ||||
| 
 | ||||
|     Ok(get_missing_events::v1::Response { events }.into()) | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue