feat: read receipts
This commit is contained in:
		
							parent
							
								
									ab8c2f1e5e
								
							
						
					
					
						commit
						3b9cadeec2
					
				
					 4 changed files with 195 additions and 21 deletions
				
			
		|  | @ -30,7 +30,10 @@ use ruma_client_api::{ | |||
|     }, | ||||
|     unversioned::get_supported_versions, | ||||
| }; | ||||
| use ruma_events::{collections::only::Event, EventType}; | ||||
| use ruma_events::{ | ||||
|     collections::only::{Event as EduEvent, Event}, | ||||
|     EventType, | ||||
| }; | ||||
| use ruma_identifiers::{RoomId, RoomIdOrAliasId, UserId}; | ||||
| use serde_json::json; | ||||
| use std::{ | ||||
|  | @ -458,6 +461,33 @@ pub fn set_read_marker_route( | |||
|     body: Ruma<set_read_marker::Request>, | ||||
|     _room_id: String, | ||||
| ) -> MatrixResult<set_read_marker::Response> { | ||||
|     let user_id = body.user_id.clone().expect("user is authenticated"); | ||||
|     // TODO: Fully read
 | ||||
|     if let Some(event) = &body.read_receipt { | ||||
|         let mut user_receipts = HashMap::new(); | ||||
|         user_receipts.insert( | ||||
|             user_id.clone(), | ||||
|             ruma_events::receipt::Receipt { | ||||
|                 ts: Some(utils::millis_since_unix_epoch()), | ||||
|             }, | ||||
|         ); | ||||
|         let mut receipt_content = HashMap::new(); | ||||
|         receipt_content.insert( | ||||
|             event.clone(), | ||||
|             ruma_events::receipt::Receipts { | ||||
|                 read: Some(user_receipts), | ||||
|             }, | ||||
|         ); | ||||
| 
 | ||||
|         data.roomlatest_update( | ||||
|             &user_id, | ||||
|             &body.room_id, | ||||
|             EduEvent::Receipt(ruma_events::receipt::ReceiptEvent { | ||||
|                 content: receipt_content, | ||||
|                 room_id: None, // None because it can be inferred
 | ||||
|             }), | ||||
|         ); | ||||
|     } | ||||
|     MatrixResult(Ok(set_read_marker::Response)) | ||||
| } | ||||
| 
 | ||||
|  | @ -707,16 +737,17 @@ pub fn sync_route( | |||
| 
 | ||||
|     let mut joined_rooms = HashMap::new(); | ||||
|     let joined_roomids = data.rooms_joined(body.user_id.as_ref().expect("user is authenticated")); | ||||
|     let since = body | ||||
|         .since | ||||
|         .clone() | ||||
|         .and_then(|string| string.parse().ok()) | ||||
|         .unwrap_or(0); | ||||
|     for room_id in joined_roomids { | ||||
|         let pdus = if let Some(since) = body.since.clone().and_then(|string| string.parse().ok()) { | ||||
|             data.pdus_since(&room_id, since) | ||||
|         } else { | ||||
|             data.pdus_all(&room_id) | ||||
|         }; | ||||
|         let pdus = { data.pdus_since(&room_id, since) }; | ||||
|         let room_events = pdus.into_iter().map(|pdu| pdu.to_room_event()).collect(); | ||||
| 
 | ||||
|         joined_rooms.insert( | ||||
|             room_id.try_into().unwrap(), | ||||
|             room_id.clone().try_into().unwrap(), | ||||
|             sync_events::JoinedRoom { | ||||
|                 account_data: sync_events::AccountData { events: Vec::new() }, | ||||
|                 summary: sync_events::RoomSummary { | ||||
|  | @ -734,7 +765,9 @@ pub fn sync_route( | |||
|                     events: room_events, | ||||
|                 }, | ||||
|                 state: sync_events::State { events: Vec::new() }, | ||||
|                 ephemeral: sync_events::Ephemeral { events: Vec::new() }, | ||||
|                 ephemeral: sync_events::Ephemeral { | ||||
|                     events: data.roomlatests_since(&room_id, since), | ||||
|                 }, | ||||
|             }, | ||||
|         ); | ||||
|     } | ||||
|  |  | |||
							
								
								
									
										107
									
								
								src/data.rs
									
									
									
									
									
								
							
							
						
						
									
										107
									
								
								src/data.rs
									
									
									
									
									
								
							|  | @ -1,7 +1,8 @@ | |||
| use crate::{utils, Database, PduEvent}; | ||||
| use ruma_events::EventType; | ||||
| use ruma_events::{collections::only::Event as EduEvent, EventResult, EventType}; | ||||
| use ruma_federation_api::RoomV3Pdu; | ||||
| use ruma_identifiers::{EventId, RoomId, UserId}; | ||||
| use serde_json::json; | ||||
| use std::{ | ||||
|     collections::HashMap, | ||||
|     convert::{TryFrom, TryInto}, | ||||
|  | @ -167,6 +168,15 @@ impl Data { | |||
|             user_id.to_string().as_bytes().into(), | ||||
|         ); | ||||
| 
 | ||||
|         self.pdu_append( | ||||
|             room_id.clone(), | ||||
|             user_id.clone(), | ||||
|             EventType::RoomMember, | ||||
|             json!({"membership": "join"}), | ||||
|             None, | ||||
|             Some(user_id.to_string()), | ||||
|         ); | ||||
| 
 | ||||
|         true | ||||
|     } | ||||
| 
 | ||||
|  | @ -187,7 +197,7 @@ impl Data { | |||
|         // Create the first part of the full pdu id
 | ||||
|         let mut prefix = vec![b'd']; | ||||
|         prefix.extend_from_slice(room_id.to_string().as_bytes()); | ||||
|         prefix.push(b'#'); // Add delimiter so we don't find rooms starting with the same id
 | ||||
|         prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id
 | ||||
| 
 | ||||
|         if let Some((key, _)) = self.db.pduid_pdu.get_gt(&prefix).unwrap() { | ||||
|             if key.starts_with(&prefix) { | ||||
|  | @ -334,14 +344,12 @@ impl Data { | |||
|         // The new value will need a new index. We store the last used index in 'n'
 | ||||
|         // The count will go up regardless of the room_id
 | ||||
|         // This is also the next_batch/since value
 | ||||
|         let count_key: Vec<u8> = vec![b'n']; | ||||
| 
 | ||||
|         // Increment the last index and use that
 | ||||
|         let index = utils::u64_from_bytes( | ||||
|             &self | ||||
|                 .db | ||||
|                 .pduid_pdu | ||||
|                 .update_and_fetch(&count_key, utils::increment) | ||||
|                 .update_and_fetch(b"n", utils::increment) | ||||
|                 .unwrap() | ||||
|                 .unwrap(), | ||||
|         ); | ||||
|  | @ -349,7 +357,7 @@ impl Data { | |||
|         let mut pdu_id = vec![b'd']; | ||||
|         pdu_id.extend_from_slice(room_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         pdu_id.push(b'#'); // Add delimiter so we don't find rooms starting with the same id
 | ||||
|         pdu_id.push(0xff); // Add delimiter so we don't find rooms starting with the same id
 | ||||
|         pdu_id.extend_from_slice(&index.to_be_bytes()); | ||||
| 
 | ||||
|         self.db | ||||
|  | @ -389,7 +397,7 @@ impl Data { | |||
|         // Create the first part of the full pdu id
 | ||||
|         let mut prefix = vec![b'd']; | ||||
|         prefix.extend_from_slice(room_id.to_string().as_bytes()); | ||||
|         prefix.push(b'#'); // Add delimiter so we don't find rooms starting with the same id
 | ||||
|         prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id
 | ||||
| 
 | ||||
|         let mut current = prefix.clone(); | ||||
|         current.extend_from_slice(&since.to_be_bytes()); | ||||
|  | @ -406,6 +414,91 @@ impl Data { | |||
|         pdus | ||||
|     } | ||||
| 
 | ||||
|     pub fn roomlatest_update(&self, user_id: &UserId, room_id: &RoomId, event: EduEvent) { | ||||
|         let mut prefix = vec![b'd']; | ||||
|         prefix.extend_from_slice(room_id.to_string().as_bytes()); | ||||
|         prefix.push(0xff); | ||||
| 
 | ||||
|         // Start with last
 | ||||
|         if let Some(mut current) = self | ||||
|             .db | ||||
|             .roomlatestid_roomlatest | ||||
|             .scan_prefix(&prefix) | ||||
|             .keys() | ||||
|             .next_back() | ||||
|             .map(|c| c.unwrap()) | ||||
|         { | ||||
|             // Remove old marker (There should at most one)
 | ||||
|             loop { | ||||
|                 if !current.starts_with(&prefix) { | ||||
|                     // We're in another room
 | ||||
|                     break; | ||||
|                 } | ||||
|                 if current.rsplitn(2, |&b| b == 0xff).next().unwrap() | ||||
|                     == user_id.to_string().as_bytes() | ||||
|                 { | ||||
|                     // This is the old room_latest
 | ||||
|                     self.db.roomlatestid_roomlatest.remove(current).unwrap(); | ||||
|                     break; | ||||
|                 } | ||||
|                 // Else, try the event before that
 | ||||
|                 if let Some((k, _)) = self.db.roomlatestid_roomlatest.get_lt(current).unwrap() { | ||||
|                     current = k; | ||||
|                 } else { | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         // Increment the last index and use that
 | ||||
|         let index = utils::u64_from_bytes( | ||||
|             &self | ||||
|                 .db | ||||
|                 .pduid_pdu | ||||
|                 .update_and_fetch(b"n", utils::increment) | ||||
|                 .unwrap() | ||||
|                 .unwrap(), | ||||
|         ); | ||||
| 
 | ||||
|         let mut room_latest_id = prefix; | ||||
|         room_latest_id.extend_from_slice(&index.to_be_bytes()); | ||||
|         room_latest_id.push(0xff); | ||||
|         room_latest_id.extend_from_slice(&user_id.to_string().as_bytes()); | ||||
| 
 | ||||
|         self.db | ||||
|             .roomlatestid_roomlatest | ||||
|             .insert(room_latest_id, &*serde_json::to_string(&event).unwrap()) | ||||
|             .unwrap(); | ||||
|     } | ||||
| 
 | ||||
|     /// Returns a vector of the most recent read_receipts in a room that happened after the event with id `since`.
 | ||||
|     pub fn roomlatests_since(&self, room_id: &RoomId, since: u64) -> Vec<EduEvent> { | ||||
|         let mut room_latests = Vec::new(); | ||||
| 
 | ||||
|         let mut prefix = vec![b'd']; | ||||
|         prefix.extend_from_slice(room_id.to_string().as_bytes()); | ||||
|         prefix.push(0xff); | ||||
| 
 | ||||
|         let mut current = prefix.clone(); | ||||
|         current.extend_from_slice(&since.to_be_bytes()); | ||||
| 
 | ||||
|         while let Some((key, value)) = self.db.roomlatestid_roomlatest.get_gt(¤t).unwrap() { | ||||
|             if key.starts_with(&prefix) { | ||||
|                 current = key.to_vec(); | ||||
|                 room_latests.push( | ||||
|                     serde_json::from_slice::<EventResult<_>>(&value) | ||||
|                         .expect("room_latest in db is valid") | ||||
|                         .into_result() | ||||
|                         .expect("room_latest in db is valid"), | ||||
|                 ); | ||||
|             } else { | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         room_latests | ||||
|     } | ||||
| 
 | ||||
|     pub fn debug(&self) { | ||||
|         self.db.debug(); | ||||
|     } | ||||
|  |  | |||
|  | @ -57,18 +57,23 @@ pub struct Database { | |||
|     pub userid_avatarurl: sled::Tree, | ||||
|     pub deviceid_token: sled::Tree, | ||||
|     pub token_userid: sled::Tree, | ||||
|     pub pduid_pdu: sled::Tree, // PduId = RoomId + Since
 | ||||
|     pub pduid_pdu: sled::Tree, // PduId = 'd' + RoomId + Since (global since counter is at 'n')
 | ||||
|     pub eventid_pduid: sled::Tree, | ||||
|     pub roomid_pduleaves: MultiValue, | ||||
|     pub roomid_userids: MultiValue, | ||||
|     pub userid_roomids: MultiValue, | ||||
|     // EDUs:
 | ||||
|     pub roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Since + UserId TODO: Types
 | ||||
|     pub timeofremoval_roomrelevants: MultiValue, // Typing
 | ||||
|     pub globalallid_globalall: sled::Tree,   // ToDevice, GlobalAllId = UserId + Since
 | ||||
|     pub globallatestid_globallatest: sled::Tree, // Presence, GlobalLatestId = Since + Type + UserId
 | ||||
|     _db: sled::Db, | ||||
| } | ||||
| 
 | ||||
| impl Database { | ||||
|     /// Tries to remove the old database but ignores all errors.
 | ||||
|     pub fn try_remove(hostname: &str) { | ||||
|         let mut path = ProjectDirs::from("xyz", "koesters", "matrixserver") | ||||
|         let mut path = ProjectDirs::from("xyz", "koesters", "conduit") | ||||
|             .unwrap() | ||||
|             .data_dir() | ||||
|             .to_path_buf(); | ||||
|  | @ -78,7 +83,7 @@ impl Database { | |||
| 
 | ||||
|     /// Load an existing database or create a new one.
 | ||||
|     pub fn load_or_create(hostname: &str) -> Self { | ||||
|         let mut path = ProjectDirs::from("xyz", "koesters", "matrixserver") | ||||
|         let mut path = ProjectDirs::from("xyz", "koesters", "conduit") | ||||
|             .unwrap() | ||||
|             .data_dir() | ||||
|             .to_path_buf(); | ||||
|  | @ -97,6 +102,12 @@ impl Database { | |||
|             roomid_pduleaves: MultiValue(db.open_tree("roomid_pduleaves").unwrap()), | ||||
|             roomid_userids: MultiValue(db.open_tree("roomid_userids").unwrap()), | ||||
|             userid_roomids: MultiValue(db.open_tree("userid_roomids").unwrap()), | ||||
|             roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest").unwrap(), | ||||
|             timeofremoval_roomrelevants: MultiValue( | ||||
|                 db.open_tree("timeofremoval_roomrelevants").unwrap(), | ||||
|             ), | ||||
|             globalallid_globalall: db.open_tree("globalallid_globalall").unwrap(), | ||||
|             globallatestid_globallatest: db.open_tree("globallatestid_globallatest").unwrap(), | ||||
|             _db: db, | ||||
|         } | ||||
|     } | ||||
|  | @ -118,7 +129,7 @@ impl Database { | |||
|                 String::from_utf8_lossy(&v), | ||||
|             ); | ||||
|         } | ||||
|         println!("# UserId -> Displayname:"); | ||||
|         println!("\n# UserId -> Displayname:"); | ||||
|         for (k, v) in self.userid_displayname.iter().map(|r| r.unwrap()) { | ||||
|             println!( | ||||
|                 "{:?} -> {:?}", | ||||
|  | @ -126,7 +137,7 @@ impl Database { | |||
|                 String::from_utf8_lossy(&v), | ||||
|             ); | ||||
|         } | ||||
|         println!("# UserId -> AvatarURL:"); | ||||
|         println!("\n# UserId -> AvatarURL:"); | ||||
|         for (k, v) in self.userid_avatarurl.iter().map(|r| r.unwrap()) { | ||||
|             println!( | ||||
|                 "{:?} -> {:?}", | ||||
|  | @ -190,5 +201,41 @@ impl Database { | |||
|                 String::from_utf8_lossy(&v), | ||||
|             ); | ||||
|         } | ||||
|         println!("\n# RoomLatestId -> RoomLatest"); | ||||
|         for (k, v) in self.roomlatestid_roomlatest.iter().map(|r| r.unwrap()) { | ||||
|             println!( | ||||
|                 "{:?} -> {:?}", | ||||
|                 String::from_utf8_lossy(&k), | ||||
|                 String::from_utf8_lossy(&v), | ||||
|             ); | ||||
|         } | ||||
|         println!("\n# TimeOfRemoval -> RoomRelevants Id:"); | ||||
|         for (k, v) in self | ||||
|             .timeofremoval_roomrelevants | ||||
|             .iter_all() | ||||
|             .map(|r| r.unwrap()) | ||||
|         { | ||||
|             println!( | ||||
|                 "{:?} -> {:?}", | ||||
|                 String::from_utf8_lossy(&k), | ||||
|                 String::from_utf8_lossy(&v), | ||||
|             ); | ||||
|         } | ||||
|         println!("\n# GlobalAllId -> GlobalAll:"); | ||||
|         for (k, v) in self.globalallid_globalall.iter().map(|r| r.unwrap()) { | ||||
|             println!( | ||||
|                 "{:?} -> {:?}", | ||||
|                 String::from_utf8_lossy(&k), | ||||
|                 String::from_utf8_lossy(&v), | ||||
|             ); | ||||
|         } | ||||
|         println!("\n# GlobalLatestId -> GlobalLatest:"); | ||||
|         for (k, v) in self.globallatestid_globallatest.iter().map(|r| r.unwrap()) { | ||||
|             println!( | ||||
|                 "{:?} -> {:?}", | ||||
|                 String::from_utf8_lossy(&k), | ||||
|                 String::from_utf8_lossy(&v), | ||||
|             ); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -1,3 +1,4 @@ | |||
| use log::warn; | ||||
| use rocket::{ | ||||
|     data::{Data, FromData, FromDataFuture, Transform, TransformFuture, Transformed}, | ||||
|     http::Status, | ||||
|  | @ -106,7 +107,7 @@ where | |||
|                     }, | ||||
|                 }), | ||||
|                 Err(e) => { | ||||
|                     log::error!("{:?}", e); | ||||
|                     warn!("{:?}", e); | ||||
|                     Failure((Status::InternalServerError, ())) | ||||
|                 } | ||||
|             } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue