Append state event that pass resolution to DB, update to tokio 1.1
This commit is contained in:
		
							parent
							
								
									e0453e2348
								
							
						
					
					
						commit
						6fd3e1d1dd
					
				
					 6 changed files with 298 additions and 476 deletions
				
			
		
							
								
								
									
										630
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										630
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							
							
								
								
									
										12
									
								
								Cargo.toml
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								Cargo.toml
									
									
									
									
									
								
							|  | @ -18,24 +18,24 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "c24f15c18 | ||||||
| #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } | #rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] } | ||||||
| 
 | 
 | ||||||
| # Used for matrix spec type definitions and helpers | # Used for matrix spec type definitions and helpers | ||||||
| ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "0635b407290abf5f34d726e1e690c92c07c738e5" } | ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "bba442580d6cd7ed990b2b63387eed2238cbadc8" } | ||||||
| # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" } | # ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" } | ||||||
| # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } | # ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } | ||||||
| 
 | 
 | ||||||
| # Used when doing state resolution | # Used when doing state resolution | ||||||
| # state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } | # state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } | ||||||
| # TODO: remove the gen-eventid feature | # TODO: remove the gen-eventid feature | ||||||
| state-res = { git = "https://github.com/ruma/state-res", branch = "no-db", features = ["unstable-pre-spec", "gen-eventid"] } | state-res = { git = "https://github.com/ruma/state-res", rev = "791c66d73cf064d09db0cdf767d5fef43a343425", features = ["unstable-pre-spec", "gen-eventid"] } | ||||||
| # state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } | # state-res = { path = "../../state-res", features = ["unstable-pre-spec", "gen-eventid"] } | ||||||
| 
 | 
 | ||||||
| # Used for long polling and federation sender, should be the same as rocket::tokio | # Used for long polling and federation sender, should be the same as rocket::tokio | ||||||
| tokio = { version = "1.0.2", features = ["macros", "time"] } | tokio = { version = "1.1.0", features = ["macros", "time", "sync"] } | ||||||
| # Used for storing data permanently | # Used for storing data permanently | ||||||
| sled = { version = "0.34.6", default-features = false } | sled = { version = "0.34.6", default-features = false } | ||||||
| # Used for emitting log entries | # Used for emitting log entries | ||||||
| log = "0.4.11" | log = "0.4.11" | ||||||
| # Used for rocket<->ruma conversions | # Used for rocket<->ruma conversions | ||||||
| http = "0.2.1" | http = "0.2.3" | ||||||
| # Used to find data directory for default db path | # Used to find data directory for default db path | ||||||
| directories = "3.0.1" | directories = "3.0.1" | ||||||
| 
 | 
 | ||||||
|  | @ -50,7 +50,7 @@ rand = "0.7.3" | ||||||
| # Used to hash passwords | # Used to hash passwords | ||||||
| rust-argon2 = "0.8.3" | rust-argon2 = "0.8.3" | ||||||
| # Used to send requests | # Used to send requests | ||||||
| reqwest = "0.10.9" | reqwest = "0.11.0" | ||||||
| # Used for conduit::Error type | # Used for conduit::Error type | ||||||
| thiserror = "1.0.22" | thiserror = "1.0.22" | ||||||
| # Used to generate thumbnails for images | # Used to generate thumbnails for images | ||||||
|  | @ -60,7 +60,7 @@ base64 = "0.13.0" | ||||||
| # Used when hashing the state | # Used when hashing the state | ||||||
| ring = "0.16.19" | ring = "0.16.19" | ||||||
| # Used when querying the SRV record of other servers | # Used when querying the SRV record of other servers | ||||||
| trust-dns-resolver = "0.19.6" | trust-dns-resolver = "0.20.0" | ||||||
| # Used to find matching events for appservices | # Used to find matching events for appservices | ||||||
| regex = "1.4.2" | regex = "1.4.2" | ||||||
| # jwt jsonwebtokens | # jwt jsonwebtokens | ||||||
|  |  | ||||||
|  | @ -701,7 +701,7 @@ pub async fn sync_events_route( | ||||||
|         let delay = tokio::time::sleep(duration); |         let delay = tokio::time::sleep(duration); | ||||||
|         tokio::pin!(delay); |         tokio::pin!(delay); | ||||||
|         tokio::select! { |         tokio::select! { | ||||||
|             _ = &mut delay => {} |             _ = &mut delay, if delay.is_elapsed() => {} | ||||||
|             _ = watcher => {} |             _ = watcher => {} | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -106,8 +106,7 @@ impl Database { | ||||||
|                 db.open_tree("global")?, |                 db.open_tree("global")?, | ||||||
|                 db.open_tree("servertimeout_signingkey")?, |                 db.open_tree("servertimeout_signingkey")?, | ||||||
|                 config, |                 config, | ||||||
|             ) |             )?, | ||||||
|             .await?, |  | ||||||
|             users: users::Users { |             users: users::Users { | ||||||
|                 userid_password: db.open_tree("userid_password")?, |                 userid_password: db.open_tree("userid_password")?, | ||||||
|                 userid_displayname: db.open_tree("userid_displayname")?, |                 userid_displayname: db.open_tree("userid_displayname")?, | ||||||
|  |  | ||||||
|  | @ -27,11 +27,7 @@ pub struct Globals { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Globals { | impl Globals { | ||||||
|     pub async fn load( |     pub fn load(globals: sled::Tree, server_keys: sled::Tree, config: Config) -> Result<Self> { | ||||||
|         globals: sled::Tree, |  | ||||||
|         server_keys: sled::Tree, |  | ||||||
|         config: Config, |  | ||||||
|     ) -> Result<Self> { |  | ||||||
|         let bytes = &*globals |         let bytes = &*globals | ||||||
|             .update_and_fetch("keypair", utils::generate_keypair)? |             .update_and_fetch("keypair", utils::generate_keypair)? | ||||||
|             .expect("utils::generate_keypair always returns Some"); |             .expect("utils::generate_keypair always returns Some"); | ||||||
|  | @ -83,11 +79,9 @@ impl Globals { | ||||||
|             config, |             config, | ||||||
|             keypair: Arc::new(keypair), |             keypair: Arc::new(keypair), | ||||||
|             reqwest_client, |             reqwest_client, | ||||||
|             dns_resolver: TokioAsyncResolver::tokio_from_system_conf() |             dns_resolver: TokioAsyncResolver::tokio_from_system_conf().map_err(|_| { | ||||||
|                 .await |                 Error::bad_config("Failed to set up trust dns resolver with system config.") | ||||||
|                 .map_err(|_| { |             })?, | ||||||
|                     Error::bad_config("Failed to set up trust dns resolver with system config.") |  | ||||||
|                 })?, |  | ||||||
|             actual_destination_cache: Arc::new(RwLock::new(HashMap::new())), |             actual_destination_cache: Arc::new(RwLock::new(HashMap::new())), | ||||||
|             jwt_decoding_key, |             jwt_decoding_key, | ||||||
|             servertimeout_signingkey: server_keys, |             servertimeout_signingkey: server_keys, | ||||||
|  |  | ||||||
|  | @ -25,7 +25,7 @@ use ruma::{ | ||||||
| }; | }; | ||||||
| use state_res::{Event, EventMap, StateMap}; | use state_res::{Event, EventMap, StateMap}; | ||||||
| use std::{ | use std::{ | ||||||
|     collections::{BTreeMap, BTreeSet}, |     collections::{BTreeMap, BTreeSet, HashMap}, | ||||||
|     convert::TryFrom, |     convert::TryFrom, | ||||||
|     fmt::Debug, |     fmt::Debug, | ||||||
|     future::Future, |     future::Future, | ||||||
|  | @ -839,7 +839,7 @@ pub async fn send_transaction_message_route<'a>( | ||||||
|                     .map(|(_, pdu)| (pdu.event_id().clone(), pdu)), |                     .map(|(_, pdu)| (pdu.event_id().clone(), pdu)), | ||||||
|             ); |             ); | ||||||
| 
 | 
 | ||||||
|             match state_res::StateResolution::resolve( |             let res = match state_res::StateResolution::resolve( | ||||||
|                 &pdu.room_id, |                 &pdu.room_id, | ||||||
|                 &RoomVersionId::Version6, |                 &RoomVersionId::Version6, | ||||||
|                 &fork_states |                 &fork_states | ||||||
|  | @ -856,10 +856,7 @@ pub async fn send_transaction_message_route<'a>( | ||||||
|                     .collect(), |                     .collect(), | ||||||
|                 &mut auth_cache, |                 &mut auth_cache, | ||||||
|             ) { |             ) { | ||||||
|                 Ok(res) => res |                 Ok(res) => res, | ||||||
|                     .into_iter() |  | ||||||
|                     .map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap()))) |  | ||||||
|                     .collect(), |  | ||||||
|                 Err(_) => { |                 Err(_) => { | ||||||
|                     resolved_map.insert( |                     resolved_map.insert( | ||||||
|                         pdu.event_id().clone(), |                         pdu.event_id().clone(), | ||||||
|  | @ -867,7 +864,29 @@ pub async fn send_transaction_message_route<'a>( | ||||||
|                     ); |                     ); | ||||||
|                     continue 'main_pdu_loop; |                     continue 'main_pdu_loop; | ||||||
|                 } |                 } | ||||||
|  |             }; | ||||||
|  |             let mut resolved = BTreeMap::new(); | ||||||
|  |             for (k, id) in res { | ||||||
|  |                 // We should know of the event but just incase
 | ||||||
|  |                 let pdu = match auth_cache.get(&id) { | ||||||
|  |                     Some(pdu) => pdu.clone(), | ||||||
|  |                     None => { | ||||||
|  |                         match fetch_events(&db, server_name, &pub_key_map, &[id], &mut auth_cache) | ||||||
|  |                             .await | ||||||
|  |                             .map(|mut vec| vec.pop()) | ||||||
|  |                         { | ||||||
|  |                             Ok(Some(aev)) => aev, | ||||||
|  |                             _ => { | ||||||
|  |                                 resolved_map | ||||||
|  |                                     .insert(event_id.clone(), Err("Failed to fetch event".into())); | ||||||
|  |                                 continue 'main_pdu_loop; | ||||||
|  |                             } | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 }; | ||||||
|  |                 resolved.insert(k, pdu); | ||||||
|             } |             } | ||||||
|  |             resolved | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|         // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
 |         // Add the event to the DB and update the forward extremities (via roomid_pduleaves).
 | ||||||
|  | @ -1199,37 +1218,67 @@ fn append_incoming_pdu( | ||||||
|     new_room_leaves: &[EventId], |     new_room_leaves: &[EventId], | ||||||
|     state: Option<StateMap<Arc<PduEvent>>>, |     state: Option<StateMap<Arc<PduEvent>>>, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|  |     // Update the state of the room if needed
 | ||||||
|  |     // We can tell if we need to do this based on wether state resolution took place or not
 | ||||||
|  |     if let Some(state) = state { | ||||||
|  |         let mut new_state = HashMap::new(); | ||||||
|  |         for ((ev_type, state_k), pdu) in state { | ||||||
|  |             match db.rooms.get_pdu_id(pdu.event_id())? { | ||||||
|  |                 Some(pduid) => { | ||||||
|  |                     new_state.insert( | ||||||
|  |                         ( | ||||||
|  |                             ev_type, | ||||||
|  |                             state_k.ok_or_else(|| { | ||||||
|  |                                 Error::Conflict("State contained non state event") | ||||||
|  |                             })?, | ||||||
|  |                         ), | ||||||
|  |                         pduid.to_vec(), | ||||||
|  |                     ); | ||||||
|  |                 } | ||||||
|  |                 None => { | ||||||
|  |                     let count = db.globals.next_count()?; | ||||||
|  |                     let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||||
|  |                     pdu_id.push(0xff); | ||||||
|  |                     pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||||
|  | 
 | ||||||
|  |                     // TODO: can we use are current state if we just add this event to the end of our
 | ||||||
|  |                     // pduid_pdu tree??
 | ||||||
|  |                     let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||||
|  | 
 | ||||||
|  |                     db.rooms.append_pdu( | ||||||
|  |                         &*pdu, | ||||||
|  |                         utils::to_canonical_object(&*pdu).expect("Pdu is valid canonical object"), | ||||||
|  |                         count, | ||||||
|  |                         pdu_id.clone().into(), | ||||||
|  |                         &new_room_leaves, | ||||||
|  |                         &db, | ||||||
|  |                     )?; | ||||||
|  |                     // TODO: is this ok...
 | ||||||
|  |                     db.rooms.set_room_state(&pdu.room_id, &statehashid)?; | ||||||
|  |                     new_state.insert( | ||||||
|  |                         ( | ||||||
|  |                             ev_type, | ||||||
|  |                             state_k.ok_or_else(|| { | ||||||
|  |                                 Error::Conflict("State contained non state event") | ||||||
|  |                             })?, | ||||||
|  |                         ), | ||||||
|  |                         pdu_id.to_vec(), | ||||||
|  |                     ); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         info!("Force update of state for {:?}", pdu); | ||||||
|  | 
 | ||||||
|  |         db.rooms | ||||||
|  |             .force_state(pdu.room_id(), new_state, &db.globals)?; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     let count = db.globals.next_count()?; |     let count = db.globals.next_count()?; | ||||||
|     let mut pdu_id = pdu.room_id.as_bytes().to_vec(); |     let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||||
|     pdu_id.push(0xff); |     pdu_id.push(0xff); | ||||||
|     pdu_id.extend_from_slice(&count.to_be_bytes()); |     pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||||
| 
 | 
 | ||||||
|     // Update the state of the room if needed
 |  | ||||||
|     // We can tell if we need to do this based on wether state resolution took place or not
 |  | ||||||
|     if let Some(state) = state { |  | ||||||
|         let new = state |  | ||||||
|             .into_iter() |  | ||||||
|             .map(|((ev, k), pdu)| { |  | ||||||
|                 Ok(( |  | ||||||
|                     ( |  | ||||||
|                         ev, |  | ||||||
|                         k.ok_or_else(|| Error::Conflict("State contained non state event"))?, |  | ||||||
|                     ), |  | ||||||
|                     db.rooms |  | ||||||
|                         .get_pdu_id(pdu.event_id()) |  | ||||||
|                         .ok() |  | ||||||
|                         .flatten() |  | ||||||
|                         .ok_or_else(|| Error::Conflict("Resolved state contained unknown event"))? |  | ||||||
|                         .to_vec(), |  | ||||||
|                 )) |  | ||||||
|             }) |  | ||||||
|             .collect::<Result<_>>()?; |  | ||||||
| 
 |  | ||||||
|         info!("Force update of state for {:?}", pdu); |  | ||||||
| 
 |  | ||||||
|         db.rooms.force_state(pdu.room_id(), new, &db.globals)?; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     // We append to state before appending the pdu, so we don't have a moment in time with the
 |     // We append to state before appending the pdu, so we don't have a moment in time with the
 | ||||||
|     // pdu without it's state. This is okay because append_pdu can't fail.
 |     // pdu without it's state. This is okay because append_pdu can't fail.
 | ||||||
|     let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; |     let statehashid = db.rooms.append_to_state(&pdu_id, &pdu, &db.globals)?; | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue