improvement: locks
This commit is contained in:
		
							parent
							
								
									e15e6d4405
								
							
						
					
					
						commit
						e12b1ff863
					
				
					 12 changed files with 321 additions and 54 deletions
				
			
		|  | @ -1,4 +1,4 @@ | |||
| use std::{collections::BTreeMap, convert::TryInto}; | ||||
| use std::{collections::BTreeMap, convert::TryInto, sync::Arc}; | ||||
| 
 | ||||
| use super::{DEVICE_ID_LENGTH, SESSION_ID_LENGTH, TOKEN_LENGTH}; | ||||
| use crate::{database::DatabaseGuard, pdu::PduBuilder, utils, ConduitResult, Error, Ruma}; | ||||
|  | @ -238,6 +238,16 @@ pub async fn register_route( | |||
| 
 | ||||
|         let room_id = RoomId::new(db.globals.server_name()); | ||||
| 
 | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|         let mut content = ruma::events::room::create::CreateEventContent::new(conduit_user.clone()); | ||||
|         content.federate = true; | ||||
|         content.predecessor = None; | ||||
|  | @ -255,6 +265,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // 2. Make conduit bot join
 | ||||
|  | @ -276,6 +287,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // 3. Power levels
 | ||||
|  | @ -300,6 +312,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // 4.1 Join Rules
 | ||||
|  | @ -317,6 +330,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // 4.2 History Visibility
 | ||||
|  | @ -336,6 +350,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // 4.3 Guest Access
 | ||||
|  | @ -353,6 +368,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // 6. Events implied by name and topic
 | ||||
|  | @ -372,6 +388,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         db.rooms.build_and_append_pdu( | ||||
|  | @ -388,6 +405,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // Room alias
 | ||||
|  | @ -410,6 +428,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         db.rooms.set_alias(&alias, Some(&room_id), &db.globals)?; | ||||
|  | @ -433,6 +452,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
|         db.rooms.build_and_append_pdu( | ||||
|             PduBuilder { | ||||
|  | @ -452,6 +472,7 @@ pub async fn register_route( | |||
|             &user_id, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
| 
 | ||||
|         // Send welcome message
 | ||||
|  | @ -470,6 +491,7 @@ pub async fn register_route( | |||
|             &conduit_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
|     } | ||||
| 
 | ||||
|  | @ -641,6 +663,16 @@ pub async fn deactivate_route( | |||
|             third_party_invite: None, | ||||
|         }; | ||||
| 
 | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|         db.rooms.build_and_append_pdu( | ||||
|             PduBuilder { | ||||
|                 event_type: EventType::RoomMember, | ||||
|  | @ -652,6 +684,7 @@ pub async fn deactivate_route( | |||
|             &sender_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -203,6 +203,16 @@ pub async fn kick_user_route( | |||
|     event.membership = ruma::events::room::member::MembershipState::Leave; | ||||
|     // TODO: reason
 | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(body.room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     db.rooms.build_and_append_pdu( | ||||
|         PduBuilder { | ||||
|             event_type: EventType::RoomMember, | ||||
|  | @ -214,8 +224,11 @@ pub async fn kick_user_route( | |||
|         &sender_user, | ||||
|         &body.room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     Ok(kick_user::Response::new().into()) | ||||
|  | @ -261,6 +274,16 @@ pub async fn ban_user_route( | |||
|             }, | ||||
|         )?; | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(body.room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     db.rooms.build_and_append_pdu( | ||||
|         PduBuilder { | ||||
|             event_type: EventType::RoomMember, | ||||
|  | @ -272,8 +295,11 @@ pub async fn ban_user_route( | |||
|         &sender_user, | ||||
|         &body.room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     Ok(ban_user::Response::new().into()) | ||||
|  | @ -310,6 +336,16 @@ pub async fn unban_user_route( | |||
| 
 | ||||
|     event.membership = ruma::events::room::member::MembershipState::Leave; | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(body.room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     db.rooms.build_and_append_pdu( | ||||
|         PduBuilder { | ||||
|             event_type: EventType::RoomMember, | ||||
|  | @ -321,8 +357,11 @@ pub async fn unban_user_route( | |||
|         &sender_user, | ||||
|         &body.room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     Ok(unban_user::Response::new().into()) | ||||
|  | @ -446,6 +485,16 @@ async fn join_room_by_id_helper( | |||
| ) -> ConduitResult<join_room_by_id::Response> { | ||||
|     let sender_user = sender_user.expect("user is authenticated"); | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     // Ask a remote server if we don't have this room
 | ||||
|     if !db.rooms.exists(&room_id)? && room_id.server_name() != db.globals.server_name() { | ||||
|         let mut make_join_response_and_server = Err(Error::BadServerResponse( | ||||
|  | @ -649,9 +698,12 @@ async fn join_room_by_id_helper( | |||
|             &sender_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
|     } | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     Ok(join_room_by_id::Response::new(room_id.clone()).into()) | ||||
|  | @ -721,13 +773,23 @@ async fn validate_and_add_event_id( | |||
|     Ok((event_id, value)) | ||||
| } | ||||
| 
 | ||||
| pub async fn invite_helper( | ||||
| pub async fn invite_helper<'a>( | ||||
|     sender_user: &UserId, | ||||
|     user_id: &UserId, | ||||
|     room_id: &RoomId, | ||||
|     db: &Database, | ||||
|     is_direct: bool, | ||||
| ) -> Result<()> { | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     if user_id.server_name() != db.globals.server_name() { | ||||
|         let prev_events = db | ||||
|             .rooms | ||||
|  | @ -863,6 +925,8 @@ pub async fn invite_helper( | |||
|         ) | ||||
|         .expect("event is valid, we just created it"); | ||||
| 
 | ||||
|         drop(mutex_lock); | ||||
| 
 | ||||
|         let invite_room_state = db.rooms.calculate_invite_state(&pdu)?; | ||||
|         let response = db | ||||
|             .sending | ||||
|  | @ -902,16 +966,6 @@ pub async fn invite_helper( | |||
|         ) | ||||
|         .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?; | ||||
| 
 | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|         let pdu_id = server_server::handle_incoming_pdu( | ||||
|             &origin, | ||||
|             &event_id, | ||||
|  | @ -932,7 +986,6 @@ pub async fn invite_helper( | |||
|             ErrorKind::InvalidParam, | ||||
|             "Could not accept incoming PDU as timeline event.", | ||||
|         ))?; | ||||
|         drop(mutex_lock); | ||||
| 
 | ||||
|         for server in db | ||||
|             .rooms | ||||
|  | @ -964,6 +1017,7 @@ pub async fn invite_helper( | |||
|         &sender_user, | ||||
|         room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     Ok(()) | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ use ruma::{ | |||
| use std::{ | ||||
|     collections::BTreeMap, | ||||
|     convert::{TryFrom, TryInto}, | ||||
|     sync::Arc, | ||||
| }; | ||||
| 
 | ||||
| #[cfg(feature = "conduit_bin")] | ||||
|  | @ -27,6 +28,16 @@ pub async fn send_message_event_route( | |||
|     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
|     let sender_device = body.sender_device.as_deref(); | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(body.room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     // Check if this is a new transaction id
 | ||||
|     if let Some(response) = | ||||
|         db.transaction_ids | ||||
|  | @ -64,6 +75,7 @@ pub async fn send_message_event_route( | |||
|         &sender_user, | ||||
|         &body.room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     db.transaction_ids.add_txnid( | ||||
|  | @ -73,6 +85,8 @@ pub async fn send_message_event_route( | |||
|         event_id.as_bytes(), | ||||
|     )?; | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     Ok(send_message_event::Response::new(event_id).into()) | ||||
|  |  | |||
|  | @ -9,7 +9,7 @@ use ruma::{ | |||
|     events::EventType, | ||||
|     serde::Raw, | ||||
| }; | ||||
| use std::convert::TryInto; | ||||
| use std::{convert::TryInto, sync::Arc}; | ||||
| 
 | ||||
| #[cfg(feature = "conduit_bin")] | ||||
| use rocket::{get, put}; | ||||
|  | @ -69,9 +69,19 @@ pub async fn set_displayname_route( | |||
|         }) | ||||
|         .filter_map(|r| r.ok()) | ||||
|     { | ||||
|         let _ = db | ||||
|             .rooms | ||||
|             .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db); | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|         let _ = | ||||
|             db.rooms | ||||
|                 .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); | ||||
| 
 | ||||
|         // Presence update
 | ||||
|         db.rooms.edus.update_presence( | ||||
|  | @ -171,9 +181,19 @@ pub async fn set_avatar_url_route( | |||
|         }) | ||||
|         .filter_map(|r| r.ok()) | ||||
|     { | ||||
|         let _ = db | ||||
|             .rooms | ||||
|             .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db); | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|         let _ = | ||||
|             db.rooms | ||||
|                 .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock); | ||||
| 
 | ||||
|         // Presence update
 | ||||
|         db.rooms.edus.update_presence( | ||||
|  |  | |||
|  | @ -1,3 +1,5 @@ | |||
| use std::sync::Arc; | ||||
| 
 | ||||
| use crate::{database::DatabaseGuard, pdu::PduBuilder, ConduitResult, Ruma}; | ||||
| use ruma::{ | ||||
|     api::client::r0::redact::redact_event, | ||||
|  | @ -18,6 +20,16 @@ pub async fn redact_event_route( | |||
| ) -> ConduitResult<redact_event::Response> { | ||||
|     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(body.room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     let event_id = db.rooms.build_and_append_pdu( | ||||
|         PduBuilder { | ||||
|             event_type: EventType::RoomRedaction, | ||||
|  | @ -32,8 +44,11 @@ pub async fn redact_event_route( | |||
|         &sender_user, | ||||
|         &body.room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     Ok(redact_event::Response { event_id }.into()) | ||||
|  |  | |||
|  | @ -15,7 +15,7 @@ use ruma::{ | |||
|     serde::Raw, | ||||
|     RoomAliasId, RoomId, RoomVersionId, | ||||
| }; | ||||
| use std::{cmp::max, collections::BTreeMap, convert::TryFrom}; | ||||
| use std::{cmp::max, collections::BTreeMap, convert::TryFrom, sync::Arc}; | ||||
| 
 | ||||
| #[cfg(feature = "conduit_bin")] | ||||
| use rocket::{get, post}; | ||||
|  | @ -33,6 +33,16 @@ pub async fn create_room_route( | |||
| 
 | ||||
|     let room_id = RoomId::new(db.globals.server_name()); | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     let alias = body | ||||
|         .room_alias_name | ||||
|         .as_ref() | ||||
|  | @ -69,6 +79,7 @@ pub async fn create_room_route( | |||
|         &sender_user, | ||||
|         &room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // 2. Let the room creator join
 | ||||
|  | @ -90,6 +101,7 @@ pub async fn create_room_route( | |||
|         &sender_user, | ||||
|         &room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // 3. Power levels
 | ||||
|  | @ -144,6 +156,7 @@ pub async fn create_room_route( | |||
|         &sender_user, | ||||
|         &room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // 4. Events set by preset
 | ||||
|  | @ -170,6 +183,7 @@ pub async fn create_room_route( | |||
|         &sender_user, | ||||
|         &room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // 4.2 History Visibility
 | ||||
|  | @ -187,6 +201,7 @@ pub async fn create_room_route( | |||
|         &sender_user, | ||||
|         &room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // 4.3 Guest Access
 | ||||
|  | @ -212,6 +227,7 @@ pub async fn create_room_route( | |||
|         &sender_user, | ||||
|         &room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // 5. Events listed in initial_state
 | ||||
|  | @ -227,7 +243,7 @@ pub async fn create_room_route( | |||
|         } | ||||
| 
 | ||||
|         db.rooms | ||||
|             .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db)?; | ||||
|             .build_and_append_pdu(pdu_builder, &sender_user, &room_id, &db, &mutex_lock)?; | ||||
|     } | ||||
| 
 | ||||
|     // 6. Events implied by name and topic
 | ||||
|  | @ -248,6 +264,7 @@ pub async fn create_room_route( | |||
|             &sender_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
|     } | ||||
| 
 | ||||
|  | @ -266,10 +283,12 @@ pub async fn create_room_route( | |||
|             &sender_user, | ||||
|             &room_id, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
|     } | ||||
| 
 | ||||
|     // 7. Events implied by invite (and TODO: invite_3pid)
 | ||||
|     drop(mutex_lock); | ||||
|     for user_id in &body.invite { | ||||
|         let _ = invite_helper(sender_user, user_id, &room_id, &db, body.is_direct).await; | ||||
|     } | ||||
|  | @ -340,6 +359,16 @@ pub async fn upgrade_room_route( | |||
|     // Create a replacement room
 | ||||
|     let replacement_room = RoomId::new(db.globals.server_name()); | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(body.room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further
 | ||||
|     // Fail if the sender does not have the required permissions
 | ||||
|     let tombstone_event_id = db.rooms.build_and_append_pdu( | ||||
|  | @ -357,6 +386,7 @@ pub async fn upgrade_room_route( | |||
|         sender_user, | ||||
|         &body.room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // Get the old room federations status
 | ||||
|  | @ -397,6 +427,7 @@ pub async fn upgrade_room_route( | |||
|         sender_user, | ||||
|         &replacement_room, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // Join the new room
 | ||||
|  | @ -418,6 +449,7 @@ pub async fn upgrade_room_route( | |||
|         sender_user, | ||||
|         &replacement_room, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     // Recommended transferable state events list from the specs
 | ||||
|  | @ -451,6 +483,7 @@ pub async fn upgrade_room_route( | |||
|             sender_user, | ||||
|             &replacement_room, | ||||
|             &db, | ||||
|             &mutex_lock, | ||||
|         )?; | ||||
|     } | ||||
| 
 | ||||
|  | @ -494,8 +527,11 @@ pub async fn upgrade_room_route( | |||
|         sender_user, | ||||
|         &body.room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     db.flush().await?; | ||||
| 
 | ||||
|     // Return the replacement room id
 | ||||
|  |  | |||
|  | @ -1,3 +1,5 @@ | |||
| use std::sync::Arc; | ||||
| 
 | ||||
| use crate::{ | ||||
|     database::DatabaseGuard, pdu::PduBuilder, ConduitResult, Database, Error, Result, Ruma, | ||||
| }; | ||||
|  | @ -257,6 +259,16 @@ pub async fn send_state_event_for_key_helper( | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     let event_id = db.rooms.build_and_append_pdu( | ||||
|         PduBuilder { | ||||
|             event_type, | ||||
|  | @ -268,6 +280,7 @@ pub async fn send_state_event_for_key_helper( | |||
|         &sender_user, | ||||
|         &room_id, | ||||
|         &db, | ||||
|         &mutex_lock, | ||||
|     )?; | ||||
| 
 | ||||
|     Ok(event_id) | ||||
|  |  | |||
|  | @ -189,6 +189,18 @@ async fn sync_helper( | |||
|     for room_id in db.rooms.rooms_joined(&sender_user) { | ||||
|         let room_id = room_id?; | ||||
| 
 | ||||
|         // Get and drop the lock to wait for remaining operations to finish
 | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
|         drop(mutex_lock); | ||||
| 
 | ||||
|         let mut non_timeline_pdus = db | ||||
|             .rooms | ||||
|             .pdus_until(&sender_user, &room_id, u64::MAX) | ||||
|  | @ -641,6 +653,19 @@ async fn sync_helper( | |||
|     let mut left_rooms = BTreeMap::new(); | ||||
|     for result in db.rooms.rooms_left(&sender_user) { | ||||
|         let (room_id, left_state_events) = result?; | ||||
| 
 | ||||
|         // Get and drop the lock to wait for remaining operations to finish
 | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
|         drop(mutex_lock); | ||||
| 
 | ||||
|         let left_count = db.rooms.get_left_count(&room_id, &sender_user)?; | ||||
| 
 | ||||
|         // Left before last sync
 | ||||
|  | @ -667,6 +692,19 @@ async fn sync_helper( | |||
|     let mut invited_rooms = BTreeMap::new(); | ||||
|     for result in db.rooms.rooms_invited(&sender_user) { | ||||
|         let (room_id, invite_state_events) = result?; | ||||
| 
 | ||||
|         // Get and drop the lock to wait for remaining operations to finish
 | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|                 .or_default(), | ||||
|         ); | ||||
|         let mutex_lock = mutex.lock().await; | ||||
|         drop(mutex_lock); | ||||
| 
 | ||||
|         let invite_count = db.rooms.get_invite_count(&room_id, &sender_user)?; | ||||
| 
 | ||||
|         // Invited before last sync
 | ||||
|  |  | |||
|  | @ -10,7 +10,7 @@ use ruma::{ | |||
|     events::{room::message, EventType}, | ||||
|     UserId, | ||||
| }; | ||||
| use tokio::sync::{RwLock, RwLockReadGuard}; | ||||
| use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard}; | ||||
| 
 | ||||
| pub enum AdminCommand { | ||||
|     RegisterAppservice(serde_yaml::Value), | ||||
|  | @ -48,38 +48,51 @@ impl Admin { | |||
|                 ) | ||||
|                 .unwrap(); | ||||
| 
 | ||||
|             if conduit_room.is_none() { | ||||
|                 warn!("Conduit instance does not have an #admins room. Logging to that room will not work. Restart Conduit after creating a user to fix this."); | ||||
|             } | ||||
|             let conduit_room = match conduit_room { | ||||
|                 None => { | ||||
|                     warn!("Conduit instance does not have an #admins room. Logging to that room will not work. Restart Conduit after creating a user to fix this."); | ||||
|                     return; | ||||
|                 } | ||||
|                 Some(r) => r, | ||||
|             }; | ||||
| 
 | ||||
|             drop(guard); | ||||
| 
 | ||||
|             let send_message = | ||||
|                 |message: message::MessageEventContent, guard: RwLockReadGuard<'_, Database>| { | ||||
|                     if let Some(conduit_room) = &conduit_room { | ||||
|                         guard | ||||
|                             .rooms | ||||
|                             .build_and_append_pdu( | ||||
|                                 PduBuilder { | ||||
|                                     event_type: EventType::RoomMessage, | ||||
|                                     content: serde_json::to_value(message) | ||||
|                                         .expect("event is valid, we just created it"), | ||||
|                                     unsigned: None, | ||||
|                                     state_key: None, | ||||
|                                     redacts: None, | ||||
|                                 }, | ||||
|                                 &conduit_user, | ||||
|                                 &conduit_room, | ||||
|                                 &guard, | ||||
|                             ) | ||||
|                             .unwrap(); | ||||
|                     } | ||||
|                 }; | ||||
|             let send_message = |message: message::MessageEventContent, | ||||
|                                 guard: RwLockReadGuard<'_, Database>, | ||||
|                                 mutex_lock: &MutexGuard<'_, ()>| { | ||||
|                 guard | ||||
|                     .rooms | ||||
|                     .build_and_append_pdu( | ||||
|                         PduBuilder { | ||||
|                             event_type: EventType::RoomMessage, | ||||
|                             content: serde_json::to_value(message) | ||||
|                                 .expect("event is valid, we just created it"), | ||||
|                             unsigned: None, | ||||
|                             state_key: None, | ||||
|                             redacts: None, | ||||
|                         }, | ||||
|                         &conduit_user, | ||||
|                         &conduit_room, | ||||
|                         &guard, | ||||
|                         mutex_lock, | ||||
|                     ) | ||||
|                     .unwrap(); | ||||
|             }; | ||||
| 
 | ||||
|             loop { | ||||
|                 tokio::select! { | ||||
|                     Some(event) = receiver.next() => { | ||||
|                         let guard = db.read().await; | ||||
|                         let mutex = Arc::clone( | ||||
|                             guard.globals | ||||
|                                 .roomid_mutex | ||||
|                                 .write() | ||||
|                                 .unwrap() | ||||
|                                 .entry(conduit_room.clone()) | ||||
|                                 .or_default(), | ||||
|                         ); | ||||
|                         let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|                         match event { | ||||
|                             AdminCommand::RegisterAppservice(yaml) => { | ||||
|  | @ -93,15 +106,17 @@ impl Admin { | |||
|                                         count, | ||||
|                                         appservices.into_iter().filter_map(|r| r.ok()).collect::<Vec<_>>().join(", ") | ||||
|                                     ); | ||||
|                                     send_message(message::MessageEventContent::text_plain(output), guard); | ||||
|                                     send_message(message::MessageEventContent::text_plain(output), guard, &mutex_lock); | ||||
|                                 } else { | ||||
|                                     send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard); | ||||
|                                     send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &mutex_lock); | ||||
|                                 } | ||||
|                             } | ||||
|                             AdminCommand::SendMessage(message) => { | ||||
|                                 send_message(message, guard) | ||||
|                                 send_message(message, guard, &mutex_lock); | ||||
|                             } | ||||
|                         } | ||||
| 
 | ||||
|                         drop(mutex_lock); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|  |  | |||
|  | @ -46,6 +46,7 @@ pub struct Globals { | |||
|     pub servername_ratelimiter: Arc<RwLock<BTreeMap<Box<ServerName>, Arc<Semaphore>>>>, | ||||
|     pub sync_receivers: RwLock<BTreeMap<(UserId, Box<DeviceId>), SyncHandle>>, | ||||
|     pub roomid_mutex: RwLock<BTreeMap<RoomId, Arc<Mutex<()>>>>, | ||||
|     pub roomid_mutex_federation: RwLock<BTreeMap<RoomId, Arc<Mutex<()>>>>, // this lock will be held longer
 | ||||
|     pub rotate: RotationHandler, | ||||
| } | ||||
| 
 | ||||
|  | @ -199,6 +200,7 @@ impl Globals { | |||
|             bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), | ||||
|             servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())), | ||||
|             roomid_mutex: RwLock::new(BTreeMap::new()), | ||||
|             roomid_mutex_federation: RwLock::new(BTreeMap::new()), | ||||
|             sync_receivers: RwLock::new(BTreeMap::new()), | ||||
|             rotate: RotationHandler::new(), | ||||
|         }; | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ mod edus; | |||
| 
 | ||||
| pub use edus::RoomEdus; | ||||
| use member::MembershipState; | ||||
| use tokio::sync::MutexGuard; | ||||
| 
 | ||||
| use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result}; | ||||
| use log::{debug, error, warn}; | ||||
|  | @ -1207,6 +1208,7 @@ impl Rooms { | |||
|         sender: &UserId, | ||||
|         room_id: &RoomId, | ||||
|         db: &Database, | ||||
|         _mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
 | ||||
|     ) -> Result<EventId> { | ||||
|         let PduBuilder { | ||||
|             event_type, | ||||
|  | @ -1216,7 +1218,6 @@ impl Rooms { | |||
|             redacts, | ||||
|         } = pdu_builder; | ||||
| 
 | ||||
|         // TODO: Make sure this isn't called twice in parallel
 | ||||
|         let prev_events = self | ||||
|             .get_pdu_leaves(&room_id)? | ||||
|             .into_iter() | ||||
|  | @ -1790,6 +1791,16 @@ impl Rooms { | |||
|                 db, | ||||
|             )?; | ||||
|         } else { | ||||
|             let mutex = Arc::clone( | ||||
|                 db.globals | ||||
|                     .roomid_mutex | ||||
|                     .write() | ||||
|                     .unwrap() | ||||
|                     .entry(room_id.clone()) | ||||
|                     .or_default(), | ||||
|             ); | ||||
|             let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|             let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>( | ||||
|                 self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())? | ||||
|                     .ok_or(Error::BadRequest( | ||||
|  | @ -1817,6 +1828,7 @@ impl Rooms { | |||
|                 user_id, | ||||
|                 room_id, | ||||
|                 db, | ||||
|                 &mutex_lock, | ||||
|             )?; | ||||
|         } | ||||
| 
 | ||||
|  |  | |||
|  | @ -640,7 +640,7 @@ pub async fn send_transaction_message_route( | |||
| 
 | ||||
|         let mutex = Arc::clone( | ||||
|             db.globals | ||||
|                 .roomid_mutex | ||||
|                 .roomid_mutex_federation | ||||
|                 .write() | ||||
|                 .unwrap() | ||||
|                 .entry(room_id.clone()) | ||||
|  | @ -1308,11 +1308,13 @@ pub fn handle_incoming_pdu<'a>( | |||
|             pdu_id = Some( | ||||
|                 append_incoming_pdu( | ||||
|                     &db, | ||||
|                     &room_id, | ||||
|                     &incoming_pdu, | ||||
|                     val, | ||||
|                     extremities, | ||||
|                     &state_at_incoming_event, | ||||
|                 ) | ||||
|                 .await | ||||
|                 .map_err(|_| "Failed to add pdu to db.".to_owned())?, | ||||
|             ); | ||||
|             debug!("Appended incoming pdu."); | ||||
|  | @ -1611,13 +1613,24 @@ pub(crate) async fn fetch_signing_keys( | |||
| /// Append the incoming event setting the state snapshot to the state from the
 | ||||
| /// server that sent the event.
 | ||||
| #[tracing::instrument(skip(db))] | ||||
| pub(crate) fn append_incoming_pdu( | ||||
| async fn append_incoming_pdu( | ||||
|     db: &Database, | ||||
|     room_id: &RoomId, | ||||
|     pdu: &PduEvent, | ||||
|     pdu_json: CanonicalJsonObject, | ||||
|     new_room_leaves: HashSet<EventId>, | ||||
|     state: &StateMap<Arc<PduEvent>>, | ||||
| ) -> Result<Vec<u8>> { | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(room_id.clone()) | ||||
|             .or_default(), | ||||
|     ); | ||||
|     let mutex_lock = mutex.lock().await; | ||||
| 
 | ||||
|     // We append to state before appending the pdu, so we don't have a moment in time with the
 | ||||
|     // pdu without it's state. This is okay because append_pdu can't fail.
 | ||||
|     db.rooms | ||||
|  | @ -1630,6 +1643,8 @@ pub(crate) fn append_incoming_pdu( | |||
|         &db, | ||||
|     )?; | ||||
| 
 | ||||
|     drop(mutex_lock); | ||||
| 
 | ||||
|     for appservice in db.appservice.iter_all()?.filter_map(|r| r.ok()) { | ||||
|         if let Some(namespaces) = appservice.1.get("namespaces") { | ||||
|             let users = namespaces | ||||
|  | @ -2145,7 +2160,7 @@ pub async fn create_join_event_route( | |||
| 
 | ||||
|     let mutex = Arc::clone( | ||||
|         db.globals | ||||
|             .roomid_mutex | ||||
|             .roomid_mutex_federation | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             .entry(body.room_id.clone()) | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue