improvement: only remove to-device events when sure the client received them
To make this work, I had to remove the recommended limit of 100 to-device events per /sync (https://matrix.org/docs/spec/client_server/latest#id72)
This commit is contained in:
		
							parent
							
								
									678f33acf9
								
							
						
					
					
						commit
						16576d19cd
					
				
					 2 changed files with 43 additions and 8 deletions
				
			
		|  | @ -2659,6 +2659,9 @@ pub fn sync_route( | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // Remove all to-device events the device received *last time*
 | ||||
|     db.users.remove_to_device_events(user_id, device_id, since)?; | ||||
| 
 | ||||
|     Ok(sync_events::Response { | ||||
|         next_batch, | ||||
|         rooms: sync_events::Rooms { | ||||
|  | @ -2711,7 +2714,7 @@ pub fn sync_route( | |||
|         }, | ||||
|         device_one_time_keys_count: Default::default(), // TODO
 | ||||
|         to_device: sync_events::ToDevice { | ||||
|             events: db.users.take_to_device_events(user_id, device_id, 100)?, | ||||
|             events: db.users.get_to_device_events(user_id, device_id)?, | ||||
|         }, | ||||
|     } | ||||
|     .into()) | ||||
|  |  | |||
|  | @ -11,7 +11,7 @@ use ruma::{ | |||
|     events::{AnyToDeviceEvent, EventJson, EventType}, | ||||
|     identifiers::{DeviceId, UserId}, | ||||
| }; | ||||
| use std::{collections::BTreeMap, convert::TryFrom, time::SystemTime}; | ||||
| use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime}; | ||||
| 
 | ||||
| pub struct Users { | ||||
|     pub(super) userid_password: sled::Tree, | ||||
|  | @ -660,11 +660,10 @@ impl Users { | |||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn take_to_device_events( | ||||
|     pub fn get_to_device_events( | ||||
|         &self, | ||||
|         user_id: &UserId, | ||||
|         device_id: &DeviceId, | ||||
|         max: usize, | ||||
|     ) -> Result<Vec<EventJson<AnyToDeviceEvent>>> { | ||||
|         let mut events = Vec::new(); | ||||
| 
 | ||||
|  | @ -673,18 +672,51 @@ impl Users { | |||
|         prefix.extend_from_slice(device_id.as_str().as_bytes()); | ||||
|         prefix.push(0xff); | ||||
| 
 | ||||
|         for result in self.todeviceid_events.scan_prefix(&prefix).take(max) { | ||||
|             let (key, value) = result?; | ||||
|         for value in self.todeviceid_events.scan_prefix(&prefix).values() { | ||||
|             events.push( | ||||
|                 serde_json::from_slice(&*value) | ||||
|                 serde_json::from_slice(&*value?) | ||||
|                     .map_err(|_| Error::bad_database("Event in todeviceid_events is invalid."))?, | ||||
|             ); | ||||
|             self.todeviceid_events.remove(key)?; | ||||
|         } | ||||
| 
 | ||||
|         Ok(events) | ||||
|     } | ||||
| 
 | ||||
|     pub fn remove_to_device_events( | ||||
|         &self, | ||||
|         user_id: &UserId, | ||||
|         device_id: &DeviceId, | ||||
|         until: u64, | ||||
|     ) -> Result<()> { | ||||
|         let mut prefix = user_id.to_string().as_bytes().to_vec(); | ||||
|         prefix.push(0xff); | ||||
|         prefix.extend_from_slice(device_id.as_ref().as_bytes()); | ||||
|         prefix.push(0xff); | ||||
| 
 | ||||
|         let mut last = prefix.clone(); | ||||
|         last.extend_from_slice(&until.to_be_bytes()); | ||||
| 
 | ||||
|         for (key, _) in self | ||||
|             .todeviceid_events | ||||
|             .range(&*prefix..=&*last) | ||||
|             .keys() | ||||
|             .map(|key| { | ||||
|                 let key = key?; | ||||
|                 Ok::<_, Error>(( | ||||
|                     key.clone(), | ||||
|                     utils::u64_from_bytes(&key[key.len() - mem::size_of::<u64>()..key.len()]) | ||||
|                         .map_err(|_| Error::bad_database("ToDeviceId has invalid count bytes."))?, | ||||
|                 )) | ||||
|             }) | ||||
|             .filter_map(|r| r.ok()) | ||||
|             .take_while(|&(_, count)| count <= until) | ||||
|         { | ||||
|             self.todeviceid_events.remove(key)?; | ||||
|         } | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn update_device_metadata( | ||||
|         &self, | ||||
|         user_id: &UserId, | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue