diff --git a/src/client_server/account.rs b/src/client_server/account.rs index 0fc8b28..9e16d90 100644 --- a/src/client_server/account.rs +++ b/src/client_server/account.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, convert::TryInto}; +use std::{collections::BTreeMap, convert::TryInto, sync::Arc}; use super::{DEVICE_ID_LENGTH, SESSION_ID_LENGTH, TOKEN_LENGTH}; use crate::{database::DatabaseGuard, pdu::PduBuilder, utils, ConduitResult, Error, Ruma}; @@ -238,6 +238,16 @@ pub async fn register_route( let room_id = RoomId::new(db.globals.server_name()); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let mut content = ruma::events::room::create::CreateEventContent::new(conduit_user.clone()); content.federate = true; content.predecessor = None; @@ -255,6 +265,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 2. Make conduit bot join @@ -276,6 +287,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 3. Power levels @@ -300,6 +312,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 4.1 Join Rules @@ -317,6 +330,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 4.2 History Visibility @@ -336,6 +350,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 4.3 Guest Access @@ -353,6 +368,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // 6. Events implied by name and topic @@ -372,6 +388,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; db.rooms.build_and_append_pdu( @@ -388,6 +405,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; // Room alias @@ -410,6 +428,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; @@ -433,6 +452,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; db.rooms.build_and_append_pdu( PduBuilder { @@ -452,6 +472,7 @@ pub async fn register_route( &user_id, &room_id, &db, + &mutex_lock, )?; // Send welcome message @@ -470,6 +491,7 @@ pub async fn register_route( &conduit_user, &room_id, &db, + &mutex_lock, )?; } @@ -641,6 +663,16 @@ pub async fn deactivate_route( third_party_invite: None, }; + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -652,6 +684,7 @@ pub async fn deactivate_route( &sender_user, &room_id, &db, + &mutex_lock, )?; } diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 9401b76..a74950b 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -203,6 +203,16 @@ pub async fn kick_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; // TODO: reason + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -214,8 +224,11 @@ pub async fn kick_user_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(kick_user::Response::new().into()) @@ -261,6 +274,16 @@ pub async fn ban_user_route( }, )?; + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -272,8 +295,11 @@ pub async fn ban_user_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(ban_user::Response::new().into()) @@ -310,6 +336,16 @@ pub async fn unban_user_route( event.membership = ruma::events::room::member::MembershipState::Leave; + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomMember, @@ -321,8 +357,11 @@ pub async fn unban_user_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(unban_user::Response::new().into()) @@ -446,6 +485,16 @@ async fn join_room_by_id_helper( ) -> ConduitResult { let sender_user = sender_user.expect("user is authenticated"); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // Ask a remote server if we don't have this room if !db.rooms.exists(&room_id)? && room_id.server_name() != db.globals.server_name() { let mut make_join_response_and_server = Err(Error::BadServerResponse( @@ -649,9 +698,12 @@ async fn join_room_by_id_helper( &sender_user, &room_id, &db, + &mutex_lock, )?; } + drop(mutex_lock); + db.flush().await?; Ok(join_room_by_id::Response::new(room_id.clone()).into()) @@ -721,13 +773,23 @@ async fn validate_and_add_event_id( Ok((event_id, value)) } -pub async fn invite_helper( +pub async fn invite_helper<'a>( sender_user: &UserId, user_id: &UserId, room_id: &RoomId, db: &Database, is_direct: bool, ) -> Result<()> { + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + if user_id.server_name() != db.globals.server_name() { let prev_events = db .rooms @@ -863,6 +925,8 @@ pub async fn invite_helper( ) .expect("event is valid, we just created it"); + drop(mutex_lock); + let invite_room_state = db.rooms.calculate_invite_state(&pdu)?; let response = db .sending @@ -902,16 +966,6 @@ pub async fn invite_helper( ) .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; - let mutex = Arc::clone( - db.globals - .roomid_mutex - .write() - .unwrap() - .entry(room_id.clone()) - .or_default(), - ); - let mutex_lock = mutex.lock().await; - let pdu_id = server_server::handle_incoming_pdu( &origin, &event_id, @@ -932,7 +986,6 @@ pub async fn invite_helper( ErrorKind::InvalidParam, "Could not accept incoming PDU as timeline event.", ))?; - drop(mutex_lock); for server in db .rooms @@ -964,6 +1017,7 @@ pub async fn invite_helper( &sender_user, room_id, &db, + &mutex_lock, )?; Ok(()) diff --git a/src/client_server/message.rs b/src/client_server/message.rs index 7e898b1..3d8218c 100644 --- a/src/client_server/message.rs +++ b/src/client_server/message.rs @@ -10,6 +10,7 @@ use ruma::{ use std::{ collections::BTreeMap, convert::{TryFrom, TryInto}, + sync::Arc, }; #[cfg(feature = "conduit_bin")] @@ -27,6 +28,16 @@ pub async fn send_message_event_route( let sender_user = body.sender_user.as_ref().expect("user is authenticated"); let sender_device = body.sender_device.as_deref(); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // Check if this is a new transaction id if let Some(response) = db.transaction_ids @@ -64,6 +75,7 @@ pub async fn send_message_event_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; db.transaction_ids.add_txnid( @@ -73,6 +85,8 @@ pub async fn send_message_event_route( event_id.as_bytes(), )?; + drop(mutex_lock); + db.flush().await?; Ok(send_message_event::Response::new(event_id).into()) diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs index 5281a4a..d947bbe 100644 --- a/src/client_server/profile.rs +++ b/src/client_server/profile.rs @@ -9,7 +9,7 @@ use ruma::{ events::EventType, serde::Raw, }; -use std::convert::TryInto; +use std::{convert::TryInto, sync::Arc}; #[cfg(feature = "conduit_bin")] use rocket::{get, put}; @@ -69,9 +69,19 @@ pub async fn set_displayname_route( }) .filter_map(|r| r.ok()) { - let _ = db - .rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + + let _ = + db.rooms + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); // Presence update db.rooms.edus.update_presence( @@ -171,9 +181,19 @@ pub async fn set_avatar_url_route( }) .filter_map(|r| r.ok()) { - let _ = db - .rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + + let _ = + db.rooms + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); // Presence update db.rooms.edus.update_presence( diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs index 3db2771..2e4c651 100644 --- a/src/client_server/redact.rs +++ b/src/client_server/redact.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{database::DatabaseGuard, pdu::PduBuilder, ConduitResult, Ruma}; use ruma::{ api::client::r0::redact::redact_event, @@ -18,6 +20,16 @@ pub async fn redact_event_route( ) -> ConduitResult { let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let event_id = db.rooms.build_and_append_pdu( PduBuilder { event_type: EventType::RoomRedaction, @@ -32,8 +44,11 @@ pub async fn redact_event_route( &sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; Ok(redact_event::Response { event_id }.into()) diff --git a/src/client_server/room.rs b/src/client_server/room.rs index 43625fe..f48c5e9 100644 --- a/src/client_server/room.rs +++ b/src/client_server/room.rs @@ -15,7 +15,7 @@ use ruma::{ serde::Raw, RoomAliasId, RoomId, RoomVersionId, }; -use std::{cmp::max, collections::BTreeMap, convert::TryFrom}; +use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc}; #[cfg(feature = "conduit_bin")] use rocket::{get, post}; @@ -33,6 +33,16 @@ pub async fn create_room_route( let room_id = RoomId::new(db.globals.server_name()); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let alias = body .room_alias_name .as_ref() @@ -69,6 +79,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 2. Let the room creator join @@ -90,6 +101,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 3. Power levels @@ -144,6 +156,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 4. Events set by preset @@ -170,6 +183,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 4.2 History Visibility @@ -187,6 +201,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 4.3 Guest Access @@ -212,6 +227,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; // 5. Events listed in initial_state @@ -227,7 +243,7 @@ pub async fn create_room_route( } db.rooms - .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db)?; + .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock)?; } // 6. Events implied by name and topic @@ -248,6 +264,7 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; } @@ -266,10 +283,12 @@ pub async fn create_room_route( &sender_user, &room_id, &db, + &mutex_lock, )?; } // 7. Events implied by invite (and TODO: invite_3pid) + drop(mutex_lock); for user_id in &body.invite { let _ = invite_helper(sender_user, user_id, &room_id, &db, body.is_direct).await; } @@ -340,6 +359,16 @@ pub async fn upgrade_room_route( // Create a replacement room let replacement_room = RoomId::new(db.globals.server_name()); + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(body.room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Fail if the sender does not have the required permissions let tombstone_event_id = db.rooms.build_and_append_pdu( @@ -357,6 +386,7 @@ pub async fn upgrade_room_route( sender_user, &body.room_id, &db, + &mutex_lock, )?; // Get the old room federations status @@ -397,6 +427,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, + &mutex_lock, )?; // Join the new room @@ -418,6 +449,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, + &mutex_lock, )?; // Recommended transferable state events list from the specs @@ -451,6 +483,7 @@ pub async fn upgrade_room_route( sender_user, &replacement_room, &db, + &mutex_lock, )?; } @@ -494,8 +527,11 @@ pub async fn upgrade_room_route( sender_user, &body.room_id, &db, + &mutex_lock, )?; + drop(mutex_lock); + db.flush().await?; // Return the replacement room id diff --git a/src/client_server/state.rs b/src/client_server/state.rs index 68246d5..e0e5d29 100644 --- a/src/client_server/state.rs +++ b/src/client_server/state.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ database::DatabaseGuard, pdu::PduBuilder, ConduitResult, Database, Error, Result, Ruma, }; @@ -257,6 +259,16 @@ pub async fn send_state_event_for_key_helper( } } + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let event_id = db.rooms.build_and_append_pdu( PduBuilder { event_type, @@ -268,6 +280,7 @@ pub async fn send_state_event_for_key_helper( &sender_user, &room_id, &db, + &mutex_lock, )?; Ok(event_id) diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 092c4a9..fe11304 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -189,6 +189,18 @@ async fn sync_helper( for room_id in db.rooms.rooms_joined(&sender_user) { let room_id = room_id?; + // Get and drop the lock to wait for remaining operations to finish + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + drop(mutex_lock); + let mut non_timeline_pdus = db .rooms .pdus_until(&sender_user, &room_id, u64::MAX) @@ -641,6 +653,19 @@ async fn sync_helper( let mut left_rooms = BTreeMap::new(); for result in db.rooms.rooms_left(&sender_user) { let (room_id, left_state_events) = result?; + + // Get and drop the lock to wait for remaining operations to finish + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + drop(mutex_lock); + let left_count = db.rooms.get_left_count(&room_id, &sender_user)?; // Left before last sync @@ -667,6 +692,19 @@ async fn sync_helper( let mut invited_rooms = BTreeMap::new(); for result in db.rooms.rooms_invited(&sender_user) { let (room_id, invite_state_events) = result?; + + // Get and drop the lock to wait for remaining operations to finish + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + drop(mutex_lock); + let invite_count = db.rooms.get_invite_count(&room_id, &sender_user)?; // Invited before last sync diff --git a/src/database/admin.rs b/src/database/admin.rs index cd5fa84..d8b7ae5 100644 --- a/src/database/admin.rs +++ b/src/database/admin.rs @@ -10,7 +10,7 @@ use ruma::{ events::{room::message, EventType}, UserId, }; -use tokio::sync::{RwLock, RwLockReadGuard}; +use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard}; pub enum AdminCommand { RegisterAppservice(serde_yaml::Value), @@ -48,38 +48,51 @@ impl Admin { ) .unwrap(); - if conduit_room.is_none() { - warn!("Conduit instance does not have an #admins room. Logging to that room will not work. Restart Conduit after creating a user to fix this."); - } + let conduit_room = match conduit_room { + None => { + warn!("Conduit instance does not have an #admins room. Logging to that room will not work. Restart Conduit after creating a user to fix this."); + return; + } + Some(r) => r, + }; drop(guard); - let send_message = - |message: message::MessageEventContent, guard: RwLockReadGuard<'_, Database>| { - if let Some(conduit_room) = &conduit_room { - guard - .rooms - .build_and_append_pdu( - PduBuilder { - event_type: EventType::RoomMessage, - content: serde_json::to_value(message) - .expect("event is valid, we just created it"), - unsigned: None, - state_key: None, - redacts: None, - }, - &conduit_user, - &conduit_room, - &guard, - ) - .unwrap(); - } - }; + let send_message = |message: message::MessageEventContent, + guard: RwLockReadGuard<'_, Database>, + mutex_lock: &MutexGuard<'_, ()>| { + guard + .rooms + .build_and_append_pdu( + PduBuilder { + event_type: EventType::RoomMessage, + content: serde_json::to_value(message) + .expect("event is valid, we just created it"), + unsigned: None, + state_key: None, + redacts: None, + }, + &conduit_user, + &conduit_room, + &guard, + mutex_lock, + ) + .unwrap(); + }; loop { tokio::select! { Some(event) = receiver.next() => { let guard = db.read().await; + let mutex = Arc::clone( + guard.globals + .roomid_mutex + .write() + .unwrap() + .entry(conduit_room.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; match event { AdminCommand::RegisterAppservice(yaml) => { @@ -93,15 +106,17 @@ impl Admin { count, appservices.into_iter().filter_map(|r| r.ok()).collect::>().join(", ") ); - send_message(message::MessageEventContent::text_plain(output), guard); + send_message(message::MessageEventContent::text_plain(output), guard, &mutex_lock); } else { - send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard); + send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &mutex_lock); } } AdminCommand::SendMessage(message) => { - send_message(message, guard) + send_message(message, guard, &mutex_lock); } } + + drop(mutex_lock); } } } diff --git a/src/database/globals.rs b/src/database/globals.rs index b6fe9e1..0e72297 100644 --- a/src/database/globals.rs +++ b/src/database/globals.rs @@ -46,6 +46,7 @@ pub struct Globals { pub servername_ratelimiter: Arc, Arc>>>, pub sync_receivers: RwLock), SyncHandle>>, pub roomid_mutex: RwLock>>>, + pub roomid_mutex_federation: RwLock>>>, // this lock will be held longer pub rotate: RotationHandler, } @@ -199,6 +200,7 @@ impl Globals { bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), roomid_mutex: RwLock::new(BTreeMap::new()), + roomid_mutex_federation: RwLock::new(BTreeMap::new()), sync_receivers: RwLock::new(BTreeMap::new()), rotate: RotationHandler::new(), }; diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 060bf45..1542db8 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -2,6 +2,7 @@ mod edus; pub use edus::RoomEdus; use member::MembershipState; +use tokio::sync::MutexGuard; use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; use log::{debug, error, warn}; @@ -1207,6 +1208,7 @@ impl Rooms { sender: &UserId, room_id: &RoomId, db: &Database, + _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex ) -> Result { let PduBuilder { event_type, @@ -1216,7 +1218,6 @@ impl Rooms { redacts, } = pdu_builder; - // TODO: Make sure this isn't called twice in parallel let prev_events = self .get_pdu_leaves(&room_id)? .into_iter() @@ -1790,6 +1791,16 @@ impl Rooms { db, )?; } else { + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + let mut event = serde_json::from_value::>( self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())? .ok_or(Error::BadRequest( @@ -1817,6 +1828,7 @@ impl Rooms { user_id, room_id, db, + &mutex_lock, )?; } diff --git a/src/server_server.rs b/src/server_server.rs index f90caad..fb49d0c 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -640,7 +640,7 @@ pub async fn send_transaction_message_route( let mutex = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_federation .write() .unwrap() .entry(room_id.clone()) @@ -1308,11 +1308,13 @@ pub fn handle_incoming_pdu<'a>( pdu_id = Some( append_incoming_pdu( &db, + &room_id, &incoming_pdu, val, extremities, &state_at_incoming_event, ) + .await .map_err(|_| "Failed to add pdu to db.".to_owned())?, ); debug!("Appended incoming pdu."); @@ -1611,13 +1613,24 @@ pub(crate) async fn fetch_signing_keys( /// Append the incoming event setting the state snapshot to the state from the /// server that sent the event. #[tracing::instrument(skip(db))] -pub(crate) fn append_incoming_pdu( +async fn append_incoming_pdu( db: &Database, + room_id: &RoomId, pdu: &PduEvent, pdu_json: CanonicalJsonObject, new_room_leaves: HashSet, state: &StateMap>, ) -> Result> { + let mutex = Arc::clone( + db.globals + .roomid_mutex + .write() + .unwrap() + .entry(room_id.clone()) + .or_default(), + ); + let mutex_lock = mutex.lock().await; + // We append to state before appending the pdu, so we don't have a moment in time with the // pdu without it's state. This is okay because append_pdu can't fail. db.rooms @@ -1630,6 +1643,8 @@ pub(crate) fn append_incoming_pdu( &db, )?; + drop(mutex_lock); + for appservice in db.appservice.iter_all()?.filter_map(|r| r.ok()) { if let Some(namespaces) = appservice.1.get("namespaces") { let users = namespaces @@ -2145,7 +2160,7 @@ pub async fn create_join_event_route( let mutex = Arc::clone( db.globals - .roomid_mutex + .roomid_mutex_federation .write() .unwrap() .entry(body.room_id.clone())