feat: handle txn ids
This commit is contained in:
		
							parent
							
								
									6343eea417
								
							
						
					
					
						commit
						4954df3cc3
					
				
					 8 changed files with 111 additions and 9 deletions
				
			
		|  | @ -1,10 +1,13 @@ | ||||||
| use super::State; | use super::State; | ||||||
| use crate::{pdu::PduBuilder, ConduitResult, Database, Error, Ruma}; | use crate::{pdu::PduBuilder, utils, ConduitResult, Database, Error, Ruma}; | ||||||
| use ruma::api::client::{ | use ruma::{ | ||||||
|  |     api::client::{ | ||||||
|         error::ErrorKind, |         error::ErrorKind, | ||||||
|         r0::message::{get_message_events, send_message_event}, |         r0::message::{get_message_events, send_message_event}, | ||||||
|  |     }, | ||||||
|  |     EventId, | ||||||
| }; | }; | ||||||
| use std::convert::TryInto; | use std::convert::{TryFrom, TryInto}; | ||||||
| 
 | 
 | ||||||
| #[cfg(feature = "conduit_bin")] | #[cfg(feature = "conduit_bin")] | ||||||
| use rocket::{get, put}; | use rocket::{get, put}; | ||||||
|  | @ -18,6 +21,29 @@ pub fn send_message_event_route( | ||||||
|     body: Ruma<send_message_event::IncomingRequest>, |     body: Ruma<send_message_event::IncomingRequest>, | ||||||
| ) -> ConduitResult<send_message_event::Response> { | ) -> ConduitResult<send_message_event::Response> { | ||||||
|     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); |     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); | ||||||
|  |     let device_id = body.device_id.as_ref().expect("user is authenticated"); | ||||||
|  | 
 | ||||||
|  |     // Check if this is a new transaction id
 | ||||||
|  |     if let Some(response) = db | ||||||
|  |         .transaction_ids | ||||||
|  |         .existing_txnid(sender_id, device_id, &body.txn_id)? | ||||||
|  |     { | ||||||
|  |         // The client might have sent a txnid of the /sendToDevice endpoint
 | ||||||
|  |         // This txnid has no response associated with it
 | ||||||
|  |         if response.is_empty() { | ||||||
|  |             return Err(Error::BadRequest( | ||||||
|  |                 ErrorKind::InvalidParam, | ||||||
|  |                 "Tried to use txn id already used for an incompatible endpoint.", | ||||||
|  |             )); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let event_id = EventId::try_from( | ||||||
|  |             utils::string_from_bytes(&response) | ||||||
|  |                 .map_err(|_| Error::bad_database("Invalid txnid bytes in database."))?, | ||||||
|  |         ) | ||||||
|  |         .map_err(|_| Error::bad_database("Invalid event id in txnid data."))?; | ||||||
|  |         return Ok(send_message_event::Response { event_id }.into()); | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     let mut unsigned = serde_json::Map::new(); |     let mut unsigned = serde_json::Map::new(); | ||||||
|     unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into()); |     unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into()); | ||||||
|  | @ -29,6 +55,7 @@ pub fn send_message_event_route( | ||||||
|             event_type: body.event_type.clone(), |             event_type: body.event_type.clone(), | ||||||
|             content: serde_json::from_str( |             content: serde_json::from_str( | ||||||
|                 body.json_body |                 body.json_body | ||||||
|  |                     .as_ref() | ||||||
|                     .ok_or(Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))? |                     .ok_or(Error::BadRequest(ErrorKind::BadJson, "Invalid JSON body."))? | ||||||
|                     .get(), |                     .get(), | ||||||
|             ) |             ) | ||||||
|  | @ -41,6 +68,8 @@ pub fn send_message_event_route( | ||||||
|         &db.account_data, |         &db.account_data, | ||||||
|     )?; |     )?; | ||||||
| 
 | 
 | ||||||
|  |     db.transaction_ids | ||||||
|  |         .add_txnid(sender_id, device_id, &body.txn_id, event_id.as_bytes())?; | ||||||
|     Ok(send_message_event::Response { event_id }.into()) |     Ok(send_message_event::Response { event_id }.into()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -17,6 +17,16 @@ pub fn send_event_to_device_route( | ||||||
|     body: Ruma<send_event_to_device::IncomingRequest>, |     body: Ruma<send_event_to_device::IncomingRequest>, | ||||||
| ) -> ConduitResult<send_event_to_device::Response> { | ) -> ConduitResult<send_event_to_device::Response> { | ||||||
|     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); |     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); | ||||||
|  |     let device_id = body.device_id.as_ref().expect("user is authenticated"); | ||||||
|  | 
 | ||||||
|  |     // Check if this is a new transaction id
 | ||||||
|  |     if db | ||||||
|  |         .transaction_ids | ||||||
|  |         .existing_txnid(sender_id, device_id, &body.txn_id)? | ||||||
|  |         .is_some() | ||||||
|  |     { | ||||||
|  |         return Ok(send_event_to_device::Response.into()); | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     for (target_user_id, map) in &body.messages { |     for (target_user_id, map) in &body.messages { | ||||||
|         for (target_device_id_maybe, event) in map { |         for (target_device_id_maybe, event) in map { | ||||||
|  | @ -52,5 +62,9 @@ pub fn send_event_to_device_route( | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     // Save transaction id with empty data
 | ||||||
|  |     db.transaction_ids | ||||||
|  |         .add_txnid(sender_id, device_id, &body.txn_id, &[])?; | ||||||
|  | 
 | ||||||
|     Ok(send_event_to_device::Response.into()) |     Ok(send_event_to_device::Response.into()) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -3,6 +3,7 @@ pub mod globals; | ||||||
| pub mod key_backups; | pub mod key_backups; | ||||||
| pub mod media; | pub mod media; | ||||||
| pub mod rooms; | pub mod rooms; | ||||||
|  | pub mod transaction_ids; | ||||||
| pub mod uiaa; | pub mod uiaa; | ||||||
| pub mod users; | pub mod users; | ||||||
| 
 | 
 | ||||||
|  | @ -23,6 +24,7 @@ pub struct Database { | ||||||
|     pub account_data: account_data::AccountData, |     pub account_data: account_data::AccountData, | ||||||
|     pub media: media::Media, |     pub media: media::Media, | ||||||
|     pub key_backups: key_backups::KeyBackups, |     pub key_backups: key_backups::KeyBackups, | ||||||
|  |     pub transaction_ids: transaction_ids::TransactionIds, | ||||||
|     pub _db: sled::Db, |     pub _db: sled::Db, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -90,7 +92,8 @@ impl Database { | ||||||
|                 edus: rooms::RoomEdus { |                 edus: rooms::RoomEdus { | ||||||
|                     readreceiptid_readreceipt: db.open_tree("readreceiptid_readreceipt")?, |                     readreceiptid_readreceipt: db.open_tree("readreceiptid_readreceipt")?, | ||||||
|                     roomuserid_privateread: db.open_tree("roomuserid_privateread")?, // "Private" read receipt
 |                     roomuserid_privateread: db.open_tree("roomuserid_privateread")?, // "Private" read receipt
 | ||||||
|                     roomuserid_lastprivatereadupdate: db.open_tree("roomid_lastprivatereadupdate")?, |                     roomuserid_lastprivatereadupdate: db | ||||||
|  |                         .open_tree("roomid_lastprivatereadupdate")?, | ||||||
|                     typingid_userid: db.open_tree("typingid_userid")?, |                     typingid_userid: db.open_tree("typingid_userid")?, | ||||||
|                     roomid_lasttypingupdate: db.open_tree("roomid_lasttypingupdate")?, |                     roomid_lasttypingupdate: db.open_tree("roomid_lasttypingupdate")?, | ||||||
|                     presenceid_presence: db.open_tree("presenceid_presence")?, |                     presenceid_presence: db.open_tree("presenceid_presence")?, | ||||||
|  | @ -124,6 +127,9 @@ impl Database { | ||||||
|                 backupid_etag: db.open_tree("backupid_etag")?, |                 backupid_etag: db.open_tree("backupid_etag")?, | ||||||
|                 backupkeyid_backup: db.open_tree("backupkeyid_backupmetadata")?, |                 backupkeyid_backup: db.open_tree("backupkeyid_backupmetadata")?, | ||||||
|             }, |             }, | ||||||
|  |             transaction_ids: transaction_ids::TransactionIds { | ||||||
|  |                 userdevicetxnid_response: db.open_tree("userdevicetxnid_response")?, | ||||||
|  |             }, | ||||||
|             _db: db, |             _db: db, | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -621,7 +621,8 @@ impl Rooms { | ||||||
|             } |             } | ||||||
|             _ => {} |             _ => {} | ||||||
|         } |         } | ||||||
|         self.edus.private_read_set(&room_id, &sender, index, &globals)?; |         self.edus | ||||||
|  |             .private_read_set(&room_id, &sender, index, &globals)?; | ||||||
| 
 | 
 | ||||||
|         Ok(pdu.event_id) |         Ok(pdu.event_id) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -92,7 +92,13 @@ impl RoomEdus { | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// Sets a private read marker at `count`.
 |     /// Sets a private read marker at `count`.
 | ||||||
|     pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64, globals: &super::super::globals::Globals) -> Result<()> { |     pub fn private_read_set( | ||||||
|  |         &self, | ||||||
|  |         room_id: &RoomId, | ||||||
|  |         user_id: &UserId, | ||||||
|  |         count: u64, | ||||||
|  |         globals: &super::super::globals::Globals, | ||||||
|  |     ) -> Result<()> { | ||||||
|         let mut key = room_id.to_string().as_bytes().to_vec(); |         let mut key = room_id.to_string().as_bytes().to_vec(); | ||||||
|         key.push(0xff); |         key.push(0xff); | ||||||
|         key.extend_from_slice(&user_id.to_string().as_bytes()); |         key.extend_from_slice(&user_id.to_string().as_bytes()); | ||||||
|  |  | ||||||
							
								
								
									
										43
									
								
								src/database/transaction_ids.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								src/database/transaction_ids.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,43 @@ | ||||||
|  | use crate::Result; | ||||||
|  | use ruma::{DeviceId, UserId}; | ||||||
|  | use sled::IVec; | ||||||
|  | 
 | ||||||
|  | pub struct TransactionIds { | ||||||
|  |     pub(super) userdevicetxnid_response: sled::Tree, // Response can be empty (/sendToDevice) or the event id (/send)
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl TransactionIds { | ||||||
|  |     pub fn add_txnid( | ||||||
|  |         &self, | ||||||
|  |         user_id: &UserId, | ||||||
|  |         device_id: &DeviceId, | ||||||
|  |         txn_id: &str, | ||||||
|  |         data: &[u8], | ||||||
|  |     ) -> Result<()> { | ||||||
|  |         let mut key = user_id.as_bytes().to_vec(); | ||||||
|  |         key.push(0xff); | ||||||
|  |         key.extend_from_slice(device_id.as_bytes()); | ||||||
|  |         key.push(0xff); | ||||||
|  |         key.extend_from_slice(txn_id.as_bytes()); | ||||||
|  | 
 | ||||||
|  |         self.userdevicetxnid_response.insert(key, data)?; | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub fn existing_txnid( | ||||||
|  |         &self, | ||||||
|  |         user_id: &UserId, | ||||||
|  |         device_id: &DeviceId, | ||||||
|  |         txn_id: &str, | ||||||
|  |     ) -> Result<Option<IVec>> { | ||||||
|  |         let mut key = user_id.as_bytes().to_vec(); | ||||||
|  |         key.push(0xff); | ||||||
|  |         key.extend_from_slice(device_id.as_bytes()); | ||||||
|  |         key.push(0xff); | ||||||
|  |         key.extend_from_slice(txn_id.as_bytes()); | ||||||
|  | 
 | ||||||
|  |         // If there's no entry, this is a new transaction
 | ||||||
|  |         Ok(self.userdevicetxnid_response.get(key)?) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -71,7 +71,9 @@ impl PduEvent { | ||||||
| 
 | 
 | ||||||
|         self.unsigned.insert( |         self.unsigned.insert( | ||||||
|             "redacted_because".to_owned(), |             "redacted_because".to_owned(), | ||||||
|             serde_json::to_string(reason).expect("PduEvent::to_string always works").into() |             serde_json::to_string(reason) | ||||||
|  |                 .expect("PduEvent::to_string always works") | ||||||
|  |                 .into(), | ||||||
|         ); |         ); | ||||||
| 
 | 
 | ||||||
|         self.content = new_content.into(); |         self.content = new_content.into(); | ||||||
|  |  | ||||||
|  | @ -38,6 +38,7 @@ Current state appears in timeline in private history with many messages before | ||||||
| Deleted tags appear in an incremental v2 /sync | Deleted tags appear in an incremental v2 /sync | ||||||
| Deleting a non-existent alias should return a 404 | Deleting a non-existent alias should return a 404 | ||||||
| Device messages wake up /sync | Device messages wake up /sync | ||||||
|  | Device messages with the same txn_id are deduplicated | ||||||
| Events come down the correct room | Events come down the correct room | ||||||
| GET /device/{deviceId} | GET /device/{deviceId} | ||||||
| GET /device/{deviceId} gives a 404 for unknown devices | GET /device/{deviceId} gives a 404 for unknown devices | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue