improvement: refactor state res and fix a few bugs in the process
This commit is contained in:
		
							parent
							
								
									16eed1d8c2
								
							
						
					
					
						commit
						c213769d9f
					
				
					 8 changed files with 651 additions and 833 deletions
				
			
		
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							|  | @ -2105,7 +2105,7 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483" | |||
| [[package]] | ||||
| name = "state-res" | ||||
| version = "0.1.0" | ||||
| source = "git+https://github.com/ruma/state-res?rev=34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488#34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488" | ||||
| source = "git+https://github.com/timokoesters/state-res?branch=improvements#1621a491a9e867a1ad4dff9f2f92b0c1e2d44aa0" | ||||
| dependencies = [ | ||||
|  "itertools 0.10.0", | ||||
|  "log", | ||||
|  |  | |||
|  | @ -23,10 +23,10 @@ ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-a | |||
| #ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks"] } | ||||
| 
 | ||||
| # Used when doing state resolution | ||||
| # state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] } | ||||
| state-res = { git = "https://github.com/timokoesters/state-res", branch = "improvements", features = ["unstable-pre-spec"] } | ||||
| # TODO: remove the gen-eventid feature | ||||
| #state-res = { git = "https://github.com/ruma/state-res", branch = "main", features = ["unstable-pre-spec", "gen-eventid"] } | ||||
| state-res = { git = "https://github.com/ruma/state-res", rev = "34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488", features = ["unstable-pre-spec", "gen-eventid"] } | ||||
| #state-res = { git = "https://github.com/ruma/state-res", rev = "1621a491a9e867a1ad4dff9f2f92b0c1e2d44aa0", features = ["unstable-pre-spec", "gen-eventid"] } | ||||
| #state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] } | ||||
| 
 | ||||
| # Used for long polling and federation sender, should be the same as rocket::tokio | ||||
|  |  | |||
|  | @ -21,11 +21,7 @@ use ruma::{ | |||
|     serde::{to_canonical_value, CanonicalJsonObject, Raw}, | ||||
|     EventId, RoomId, RoomVersionId, ServerName, UserId, | ||||
| }; | ||||
| use std::{ | ||||
|     collections::{BTreeMap, HashMap}, | ||||
|     convert::TryFrom, | ||||
|     sync::Arc, | ||||
| }; | ||||
| use std::{collections::BTreeMap, convert::TryFrom, sync::Arc}; | ||||
| 
 | ||||
| #[cfg(feature = "conduit_bin")] | ||||
| use rocket::{get, post}; | ||||
|  | @ -573,7 +569,7 @@ async fn join_room_by_id_helper( | |||
|         let pdu = PduEvent::from_id_val(&event_id, join_event.clone()) | ||||
|             .map_err(|_| Error::BadServerResponse("Invalid PDU in send_join response."))?; | ||||
| 
 | ||||
|         let mut state = HashMap::new(); | ||||
|         let mut state = BTreeMap::new(); | ||||
| 
 | ||||
|         for pdu in send_join_response | ||||
|             .room_state | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| use super::State; | ||||
| use crate::{ConduitResult, Database, Error, Ruma}; | ||||
| use log::error; | ||||
| use ruma::{ | ||||
|     api::client::r0::sync::sync_events, | ||||
|     events::{room::member::MembershipState, AnySyncEphemeralRoomEvent, EventType}, | ||||
|  | @ -505,7 +506,10 @@ pub async fn sync_events_route( | |||
|                 db.rooms | ||||
|                     .pdu_shortstatehash(&pdu.1.event_id) | ||||
|                     .ok()? | ||||
|                     .ok_or_else(|| Error::bad_database("Pdu in db doesn't have a state hash.")) | ||||
|                     .ok_or_else(|| { | ||||
|                         error!("{:?}", pdu.1); | ||||
|                         Error::bad_database("Pdu in db doesn't have a state hash.") | ||||
|                     }) | ||||
|                     .ok() | ||||
|             }) | ||||
|             .and_then(|shortstatehash| { | ||||
|  |  | |||
|  | @ -215,7 +215,7 @@ pub async fn send_push_notice( | |||
|                             _ => None, | ||||
|                         }) | ||||
|                         .collect::<Vec<_>>(); | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | @ -235,7 +235,7 @@ pub async fn send_push_notice( | |||
|                                 _ => None, | ||||
|                             }) | ||||
|                             .collect::<Vec<_>>(); | ||||
|                         send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                         send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|  | @ -266,8 +266,7 @@ pub async fn send_push_notice( | |||
|                                     _ => None, | ||||
|                                 }) | ||||
|                                 .collect::<Vec<_>>(); | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) | ||||
|                                 .await?; | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|  | @ -292,8 +291,7 @@ pub async fn send_push_notice( | |||
|                                     _ => None, | ||||
|                                 }) | ||||
|                                 .collect::<Vec<_>>(); | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) | ||||
|                                 .await?; | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|  | @ -309,7 +307,7 @@ pub async fn send_push_notice( | |||
|                             _ => None, | ||||
|                         }) | ||||
|                         .collect::<Vec<_>>(); | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | @ -346,8 +344,7 @@ pub async fn send_push_notice( | |||
|                                     _ => None, | ||||
|                                 }) | ||||
|                                 .collect::<Vec<_>>(); | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) | ||||
|                                 .await?; | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|  | @ -372,8 +369,7 @@ pub async fn send_push_notice( | |||
|                                     _ => None, | ||||
|                                 }) | ||||
|                                 .collect::<Vec<_>>(); | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()) | ||||
|                                 .await?; | ||||
|                             send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|  | @ -389,7 +385,7 @@ pub async fn send_push_notice( | |||
|                             _ => None, | ||||
|                         }) | ||||
|                         .collect::<Vec<_>>(); | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | @ -405,7 +401,7 @@ pub async fn send_push_notice( | |||
|                             _ => None, | ||||
|                         }) | ||||
|                         .collect::<Vec<_>>(); | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | @ -421,7 +417,7 @@ pub async fn send_push_notice( | |||
|                             _ => None, | ||||
|                         }) | ||||
|                         .collect::<Vec<_>>(); | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | @ -435,7 +431,7 @@ pub async fn send_push_notice( | |||
|                             _ => None, | ||||
|                         }) | ||||
|                         .collect::<Vec<_>>(); | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | @ -449,7 +445,7 @@ pub async fn send_push_notice( | |||
|                             _ => None, | ||||
|                         }) | ||||
|                         .collect::<Vec<_>>(); | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?; | ||||
|                     send_notice(unread, pusher, tweaks, pdu, db).await?; | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|  | @ -466,7 +462,6 @@ async fn send_notice( | |||
|     tweaks: Vec<Tweak>, | ||||
|     event: &PduEvent, | ||||
|     db: &Database, | ||||
|     name: &str, | ||||
| ) -> Result<()> { | ||||
|     // TODO: email
 | ||||
|     if pusher.kind == Some(PusherKind::Email) { | ||||
|  | @ -514,7 +509,6 @@ async fn send_notice( | |||
|     } | ||||
| 
 | ||||
|     if event_id_only { | ||||
|         error!("SEND PUSH NOTICE `{}`", name); | ||||
|         send_request( | ||||
|             &db.globals, | ||||
|             &url, | ||||
|  | @ -542,7 +536,6 @@ async fn send_notice( | |||
|             .flatten(); | ||||
|         notifi.room_name = room_name.as_deref(); | ||||
| 
 | ||||
|         error!("SEND PUSH NOTICE Full `{}`", name); | ||||
|         send_request( | ||||
|             &db.globals, | ||||
|             &url, | ||||
|  |  | |||
|  | @ -3,27 +3,24 @@ mod edus; | |||
| pub use edus::RoomEdus; | ||||
| 
 | ||||
| use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; | ||||
| use log::{error, warn}; | ||||
| use log::{debug, error, warn}; | ||||
| use regex::Regex; | ||||
| use ring::digest; | ||||
| use ruma::{ | ||||
|     api::client::error::ErrorKind, | ||||
|     events::{ | ||||
|         ignored_user_list, | ||||
|         room::{ | ||||
|             member, message, | ||||
|             power_levels::{self, PowerLevelsEventContent}, | ||||
|         }, | ||||
|         room::{create::CreateEventContent, member, message}, | ||||
|         EventType, | ||||
|     }, | ||||
|     serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw}, | ||||
|     EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId, | ||||
| }; | ||||
| use sled::IVec; | ||||
| use state_res::{event_auth, Event, StateMap}; | ||||
| use state_res::{Event, StateMap}; | ||||
| 
 | ||||
| use std::{ | ||||
|     collections::{BTreeMap, HashMap}, | ||||
|     collections::{BTreeMap, HashMap, HashSet}, | ||||
|     convert::{TryFrom, TryInto}, | ||||
|     mem, | ||||
|     sync::Arc, | ||||
|  | @ -227,26 +224,24 @@ impl Rooms { | |||
|         sender: &UserId, | ||||
|         state_key: Option<&str>, | ||||
|         content: serde_json::Value, | ||||
|     ) -> Result<StateMap<PduEvent>> { | ||||
|     ) -> Result<StateMap<Arc<PduEvent>>> { | ||||
|         let auth_events = state_res::auth_types_for_event( | ||||
|             kind, | ||||
|             sender, | ||||
|             state_key.map(|s| s.to_string()), | ||||
|             content, | ||||
|             content.clone(), | ||||
|         ); | ||||
| 
 | ||||
|         let mut events = StateMap::new(); | ||||
|         for (event_type, state_key) in auth_events { | ||||
|             if let Some(pdu) = self.room_state_get( | ||||
|                 room_id, | ||||
|                 &event_type, | ||||
|                 &state_key | ||||
|                     .as_deref() | ||||
|                     .ok_or_else(|| Error::bad_database("Saved auth event with no state key."))?, | ||||
|             )? { | ||||
|                 events.insert((event_type, state_key), pdu); | ||||
|             if let Some(pdu) = self.room_state_get(room_id, &event_type, &state_key)? { | ||||
|                 events.insert((event_type, state_key), Arc::new(pdu)); | ||||
|             } else { | ||||
|                 warn!("Could not find {} {:?} in state", event_type, state_key); | ||||
|                 // This is okay because when creating a new room some events were not created yet
 | ||||
|                 debug!( | ||||
|                     "{:?}: Could not find {} {:?} in state", | ||||
|                     content, event_type, state_key | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|         Ok(events) | ||||
|  | @ -281,7 +276,7 @@ impl Rooms { | |||
|     pub fn force_state( | ||||
|         &self, | ||||
|         room_id: &RoomId, | ||||
|         state: HashMap<(EventType, String), EventId>, | ||||
|         state: BTreeMap<(EventType, String), EventId>, | ||||
|         globals: &super::globals::Globals, | ||||
|     ) -> Result<()> { | ||||
|         let state_hash = self.calculate_hash( | ||||
|  | @ -293,8 +288,10 @@ impl Rooms { | |||
| 
 | ||||
|         let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { | ||||
|             Some(shortstatehash) => { | ||||
|                 warn!("state hash already existed?!"); | ||||
|                 shortstatehash.to_vec() | ||||
|                 // State already existed in db
 | ||||
|                 self.roomid_shortstatehash | ||||
|                     .insert(room_id.as_bytes(), &*shortstatehash)?; | ||||
|                 return Ok(()); | ||||
|             } | ||||
|             None => { | ||||
|                 let shortstatehash = globals.next_count()?; | ||||
|  | @ -483,14 +480,11 @@ impl Rooms { | |||
|     } | ||||
| 
 | ||||
|     /// Returns the leaf pdus of a room.
 | ||||
|     pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<Vec<EventId>> { | ||||
|     pub fn get_pdu_leaves(&self, room_id: &RoomId) -> Result<HashSet<EventId>> { | ||||
|         let mut prefix = room_id.as_bytes().to_vec(); | ||||
|         prefix.push(0xff); | ||||
| 
 | ||||
|         let mut events = Vec::new(); | ||||
| 
 | ||||
|         for event in self | ||||
|             .roomid_pduleaves | ||||
|         self.roomid_pduleaves | ||||
|             .scan_prefix(prefix) | ||||
|             .values() | ||||
|             .map(|bytes| { | ||||
|  | @ -501,11 +495,7 @@ impl Rooms { | |||
|                     .map_err(|_| Error::bad_database("EventId in roomid_pduleaves is invalid."))?, | ||||
|                 ) | ||||
|             }) | ||||
|         { | ||||
|             events.push(event?); | ||||
|         } | ||||
| 
 | ||||
|         Ok(events) | ||||
|             .collect() | ||||
|     } | ||||
| 
 | ||||
|     /// Replace the leaves of a room.
 | ||||
|  | @ -761,6 +751,90 @@ impl Rooms { | |||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Generates a new StateHash and associates it with the incoming event.
 | ||||
|     ///
 | ||||
|     /// This adds all current state events (not including the incoming event)
 | ||||
|     /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
 | ||||
|     pub fn set_event_state( | ||||
|         &self, | ||||
|         event_id: &EventId, | ||||
|         state: &StateMap<Arc<PduEvent>>, | ||||
|         globals: &super::globals::Globals, | ||||
|     ) -> Result<()> { | ||||
|         let shorteventid = match self.eventid_shorteventid.get(event_id.as_bytes())? { | ||||
|             Some(shorteventid) => shorteventid.to_vec(), | ||||
|             None => { | ||||
|                 let shorteventid = globals.next_count()?; | ||||
|                 self.eventid_shorteventid | ||||
|                     .insert(event_id.as_bytes(), &shorteventid.to_be_bytes())?; | ||||
|                 self.shorteventid_eventid | ||||
|                     .insert(&shorteventid.to_be_bytes(), event_id.as_bytes())?; | ||||
|                 shorteventid.to_be_bytes().to_vec() | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         let state_hash = self.calculate_hash( | ||||
|             &state | ||||
|                 .values() | ||||
|                 .map(|pdu| pdu.event_id.as_bytes()) | ||||
|                 .collect::<Vec<_>>(), | ||||
|         ); | ||||
| 
 | ||||
|         let shortstatehash = match self.statehash_shortstatehash.get(&state_hash)? { | ||||
|             Some(shortstatehash) => { | ||||
|                 // State already existed in db
 | ||||
|                 self.shorteventid_shortstatehash | ||||
|                     .insert(shorteventid, &*shortstatehash)?; | ||||
|                 return Ok(()); | ||||
|             } | ||||
|             None => { | ||||
|                 let shortstatehash = globals.next_count()?; | ||||
|                 self.statehash_shortstatehash | ||||
|                     .insert(&state_hash, &shortstatehash.to_be_bytes())?; | ||||
|                 shortstatehash.to_be_bytes().to_vec() | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         for ((event_type, state_key), pdu) in state { | ||||
|             let mut statekey = event_type.as_ref().as_bytes().to_vec(); | ||||
|             statekey.push(0xff); | ||||
|             statekey.extend_from_slice(&state_key.as_bytes()); | ||||
| 
 | ||||
|             let shortstatekey = match self.statekey_shortstatekey.get(&statekey)? { | ||||
|                 Some(shortstatekey) => shortstatekey.to_vec(), | ||||
|                 None => { | ||||
|                     let shortstatekey = globals.next_count()?; | ||||
|                     self.statekey_shortstatekey | ||||
|                         .insert(&statekey, &shortstatekey.to_be_bytes())?; | ||||
|                     shortstatekey.to_be_bytes().to_vec() | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             let shorteventid = match self.eventid_shorteventid.get(pdu.event_id.as_bytes())? { | ||||
|                 Some(shorteventid) => shorteventid.to_vec(), | ||||
|                 None => { | ||||
|                     let shorteventid = globals.next_count()?; | ||||
|                     self.eventid_shorteventid | ||||
|                         .insert(pdu.event_id.as_bytes(), &shorteventid.to_be_bytes())?; | ||||
|                     self.shorteventid_eventid | ||||
|                         .insert(&shorteventid.to_be_bytes(), pdu.event_id.as_bytes())?; | ||||
|                     shorteventid.to_be_bytes().to_vec() | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             let mut state_id = shortstatehash.clone(); | ||||
|             state_id.extend_from_slice(&shortstatekey); | ||||
| 
 | ||||
|             self.stateid_shorteventid | ||||
|                 .insert(&*state_id, &*shorteventid)?; | ||||
|         } | ||||
| 
 | ||||
|         self.shorteventid_shortstatehash | ||||
|             .insert(shorteventid, &*shortstatehash)?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Generates a new StateHash and associates it with the incoming event.
 | ||||
|     ///
 | ||||
|     /// This adds all current state events (not including the incoming event)
 | ||||
|  | @ -900,8 +974,37 @@ impl Rooms { | |||
|             redacts, | ||||
|         } = pdu_builder; | ||||
|         // TODO: Make sure this isn't called twice in parallel
 | ||||
|         let mut prev_events = self.get_pdu_leaves(&room_id)?; | ||||
|         prev_events.truncate(20); | ||||
|         let prev_events = self | ||||
|             .get_pdu_leaves(&room_id)? | ||||
|             .into_iter() | ||||
|             .take(20) | ||||
|             .collect::<Vec<_>>(); | ||||
| 
 | ||||
|         let create_event = self.room_state_get(&room_id, &EventType::RoomCreate, "")?; | ||||
| 
 | ||||
|         let create_event_content = create_event | ||||
|             .as_ref() | ||||
|             .map(|create_event| { | ||||
|                 Ok::<_, Error>( | ||||
|                     serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone()) | ||||
|                         .expect("Raw::from_value always works.") | ||||
|                         .deserialize() | ||||
|                         .map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?, | ||||
|                 ) | ||||
|             }) | ||||
|             .transpose()?; | ||||
| 
 | ||||
|         let create_prev_event = if prev_events.len() == 1 | ||||
|             && Some(&prev_events[0]) == create_event.as_ref().map(|c| &c.event_id) | ||||
|         { | ||||
|             create_event.map(Arc::new) | ||||
|         } else { | ||||
|             None | ||||
|         }; | ||||
| 
 | ||||
|         let room_version = create_event_content.map_or(RoomVersionId::Version6, |create_event| { | ||||
|             create_event.room_version | ||||
|         }); | ||||
| 
 | ||||
|         let auth_events = self.get_auth_events( | ||||
|             &room_id, | ||||
|  | @ -911,118 +1014,6 @@ impl Rooms { | |||
|             content.clone(), | ||||
|         )?; | ||||
| 
 | ||||
|         // Is the event authorized?
 | ||||
|         if let Some(state_key) = &state_key { | ||||
|             let power_levels = self | ||||
|                 .room_state_get(&room_id, &EventType::RoomPowerLevels, "")? | ||||
|                 .map_or_else( | ||||
|                     || { | ||||
|                         Ok::<_, Error>(power_levels::PowerLevelsEventContent { | ||||
|                             ban: 50.into(), | ||||
|                             events: BTreeMap::new(), | ||||
|                             events_default: 0.into(), | ||||
|                             invite: 50.into(), | ||||
|                             kick: 50.into(), | ||||
|                             redact: 50.into(), | ||||
|                             state_default: 0.into(), | ||||
|                             users: BTreeMap::new(), | ||||
|                             users_default: 0.into(), | ||||
|                             notifications: | ||||
|                                 ruma::events::room::power_levels::NotificationPowerLevels { | ||||
|                                     room: 50.into(), | ||||
|                                 }, | ||||
|                         }) | ||||
|                     }, | ||||
|                     |power_levels| { | ||||
|                         Ok(serde_json::from_value::<Raw<PowerLevelsEventContent>>( | ||||
|                             power_levels.content, | ||||
|                         ) | ||||
|                         .expect("Raw::from_value always works.") | ||||
|                         .deserialize() | ||||
|                         .map_err(|_| Error::bad_database("Invalid PowerLevels event in db."))?) | ||||
|                     }, | ||||
|                 )?; | ||||
|             let sender_membership = self | ||||
|                 .room_state_get(&room_id, &EventType::RoomMember, &sender.to_string())? | ||||
|                 .map_or(Ok::<_, Error>(member::MembershipState::Leave), |pdu| { | ||||
|                     Ok( | ||||
|                         serde_json::from_value::<Raw<member::MemberEventContent>>(pdu.content) | ||||
|                             .expect("Raw::from_value always works.") | ||||
|                             .deserialize() | ||||
|                             .map_err(|_| Error::bad_database("Invalid Member event in db."))? | ||||
|                             .membership, | ||||
|                     ) | ||||
|                 })?; | ||||
| 
 | ||||
|             let sender_power = power_levels.users.get(&sender).map_or_else( | ||||
|                 || { | ||||
|                     if sender_membership != member::MembershipState::Join { | ||||
|                         None | ||||
|                     } else { | ||||
|                         Some(&power_levels.users_default) | ||||
|                     } | ||||
|                 }, | ||||
|                 // If it's okay, wrap with Some(_)
 | ||||
|                 Some, | ||||
|             ); | ||||
| 
 | ||||
|             // Is the event allowed?
 | ||||
|             #[allow(clippy::blocks_in_if_conditions)] | ||||
|             if !match event_type { | ||||
|                 EventType::RoomEncryption => { | ||||
|                     // Only allow encryption events if it's allowed in the config
 | ||||
|                     db.globals.allow_encryption() | ||||
|                 } | ||||
|                 EventType::RoomMember => { | ||||
|                     let prev_event = self | ||||
|                         .get_pdu(prev_events.get(0).ok_or(Error::BadRequest( | ||||
|                             ErrorKind::Unknown, | ||||
|                             "Membership can't be the first event", | ||||
|                         ))?)? | ||||
|                         .map(Arc::new); | ||||
|                     event_auth::valid_membership_change( | ||||
|                         Some(state_key.as_str()), | ||||
|                         &sender, | ||||
|                         content.clone(), | ||||
|                         prev_event, | ||||
|                         None, // TODO: third party invite
 | ||||
|                         &auth_events | ||||
|                             .iter() | ||||
|                             .map(|((ty, key), pdu)| { | ||||
|                                 Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone()))) | ||||
|                             }) | ||||
|                             .collect::<Result<StateMap<_>>>()?, | ||||
|                     ) | ||||
|                     .map_err(|e| { | ||||
|                         log::error!("{}", e); | ||||
|                         Error::Conflict("Found incoming PDU with invalid data.") | ||||
|                     })? | ||||
|                 } | ||||
|                 EventType::RoomCreate => prev_events.is_empty(), | ||||
|                 // Not allow any of the following events if the sender is not joined.
 | ||||
|                 _ if sender_membership != member::MembershipState::Join => false, | ||||
|                 _ => { | ||||
|                     // TODO
 | ||||
|                     sender_power.unwrap_or(&power_levels.users_default) | ||||
|                         >= &power_levels.state_default | ||||
|                 } | ||||
|             } { | ||||
|                 error!("Unauthorized {}", event_type); | ||||
|                 // Not authorized
 | ||||
|                 return Err(Error::BadRequest( | ||||
|                     ErrorKind::Forbidden, | ||||
|                     "Event is not authorized", | ||||
|                 )); | ||||
|             } | ||||
|         } else if !self.is_joined(&sender, &room_id)? { | ||||
|             // TODO: auth rules apply to all events, not only those with a state key
 | ||||
|             error!("Unauthorized {}", event_type); | ||||
|             return Err(Error::BadRequest( | ||||
|                 ErrorKind::Forbidden, | ||||
|                 "Event is not authorized", | ||||
|             )); | ||||
|         } | ||||
| 
 | ||||
|         // Our depth is the maximum depth of prev_events + 1
 | ||||
|         let depth = prev_events | ||||
|             .iter() | ||||
|  | @ -1057,8 +1048,8 @@ impl Rooms { | |||
|                 .try_into() | ||||
|                 .map_err(|_| Error::bad_database("Depth is invalid"))?, | ||||
|             auth_events: auth_events | ||||
|                 .into_iter() | ||||
|                 .map(|(_, pdu)| pdu.event_id) | ||||
|                 .iter() | ||||
|                 .map(|(_, pdu)| pdu.event_id.clone()) | ||||
|                 .collect(), | ||||
|             redacts, | ||||
|             unsigned, | ||||
|  | @ -1068,6 +1059,23 @@ impl Rooms { | |||
|             signatures: BTreeMap::new(), | ||||
|         }; | ||||
| 
 | ||||
|         if !state_res::auth_check( | ||||
|             &room_version, | ||||
|             &Arc::new(pdu.clone()), | ||||
|             create_prev_event, | ||||
|             &auth_events, | ||||
|             None, // TODO: third_party_invite
 | ||||
|         ) | ||||
|         .map_err(|e| { | ||||
|             error!("{:?}", e); | ||||
|             Error::bad_database("Auth check failed.") | ||||
|         })? { | ||||
|             return Err(Error::BadRequest( | ||||
|                 ErrorKind::InvalidParam, | ||||
|                 "Event is not authorized.", | ||||
|             )); | ||||
|         } | ||||
| 
 | ||||
|         // Hash and sign
 | ||||
|         let mut pdu_json = | ||||
|             utils::to_canonical_object(&pdu).expect("event is valid, we just created it"); | ||||
|  |  | |||
|  | @ -175,8 +175,7 @@ impl Sending { | |||
|                                     // servercurrentpdus with the prefix should be empty now
 | ||||
|                                 } | ||||
|                             } | ||||
|                             Err((outgoing_kind, e)) => { | ||||
|                                 info!("Couldn't send transaction to {:?}\n{}", outgoing_kind, e); | ||||
|                             Err((outgoing_kind, _)) => { | ||||
|                                 let mut prefix = match &outgoing_kind { | ||||
|                                     OutgoingKind::Appservice(serv) => { | ||||
|                                         let mut p = b"+".to_vec(); | ||||
|  | @ -217,7 +216,7 @@ impl Sending { | |||
| 
 | ||||
|                             let exponential_backoff = |(tries, instant): &(u32, Instant)| { | ||||
|                                 // Fail if a request has failed recently (exponential backoff)
 | ||||
|                                 let mut min_elapsed_duration = Duration::from_secs(60) * (*tries) * (*tries); | ||||
|                                 let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries); | ||||
|                                 if min_elapsed_duration > Duration::from_secs(60*60*24) { | ||||
|                                     min_elapsed_duration = Duration::from_secs(60*60*24); | ||||
|                                 } | ||||
|  | @ -261,6 +260,8 @@ impl Sending { | |||
|                                 servercurrentpdus.insert(&key, &[]).unwrap(); | ||||
|                                 servernamepduids.remove(&key).unwrap(); | ||||
| 
 | ||||
|                                 last_failed_try.remove(&outgoing_kind); | ||||
| 
 | ||||
|                                 futures.push( | ||||
|                                     Self::handle_event( | ||||
|                                         outgoing_kind, | ||||
|  |  | |||
							
								
								
									
										1102
									
								
								src/server_server.rs
									
									
									
									
									
								
							
							
						
						
									
										1102
									
								
								src/server_server.rs
									
									
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							
		Loading…
	
		Reference in a new issue