improve: presence
This commit is contained in:
		
							parent
							
								
									05f9d927b8
								
							
						
					
					
						commit
						d891bbb5dc
					
				
					 5 changed files with 304 additions and 149 deletions
				
			
		|  | @ -18,6 +18,7 @@ rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_p | ||||||
| 
 | 
 | ||||||
| tokio = "0.2.22" # Used for long polling | tokio = "0.2.22" # Used for long polling | ||||||
| ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"], rev = "d5d2d1d893fa12d27960e4c58d6c09b215d06e95" } # Used for matrix spec type definitions and helpers | ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"], rev = "d5d2d1d893fa12d27960e4c58d6c09b215d06e95" } # Used for matrix spec type definitions and helpers | ||||||
|  | #ruma = { path = "../ruma/ruma", features = ["rand", "client-api", "federation-api", "unstable-pre-spec", "unstable-synapse-quirks"] } | ||||||
| sled = "0.32.0" # Used for storing data permanently | sled = "0.32.0" # Used for storing data permanently | ||||||
| log = "0.4.8" # Used for emitting log entries | log = "0.4.8" # Used for emitting log entries | ||||||
| http = "0.2.1" # Used for rocket<->ruma conversions | http = "0.2.1" # Used for rocket<->ruma conversions | ||||||
|  |  | ||||||
|  | @ -1,5 +1,5 @@ | ||||||
| use std::{ | use std::{ | ||||||
|     collections::BTreeMap, |     collections::{hash_map, BTreeMap, HashMap}, | ||||||
|     convert::{TryFrom, TryInto}, |     convert::{TryFrom, TryInto}, | ||||||
|     time::{Duration, SystemTime}, |     time::{Duration, SystemTime}, | ||||||
| }; | }; | ||||||
|  | @ -645,7 +645,7 @@ pub fn set_displayname_route( | ||||||
|     db.users |     db.users | ||||||
|         .set_displayname(&sender_id, body.displayname.clone())?; |         .set_displayname(&sender_id, body.displayname.clone())?; | ||||||
| 
 | 
 | ||||||
|     // Send a new membership event into all joined rooms
 |     // Send a new membership event and presence update into all joined rooms
 | ||||||
|     for room_id in db.rooms.rooms_joined(&sender_id) { |     for room_id in db.rooms.rooms_joined(&sender_id) { | ||||||
|         let room_id = room_id?; |         let room_id = room_id?; | ||||||
|         db.rooms.append_pdu( |         db.rooms.append_pdu( | ||||||
|  | @ -675,27 +675,29 @@ pub fn set_displayname_route( | ||||||
|             None, |             None, | ||||||
|             &db.globals, |             &db.globals, | ||||||
|         )?; |         )?; | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     // Presence update
 |         // Presence update
 | ||||||
|     db.global_edus.update_presence( |         db.rooms.edus.update_presence( | ||||||
|         ruma::events::presence::PresenceEvent { |             &sender_id, | ||||||
|             content: ruma::events::presence::PresenceEventContent { |             &room_id, | ||||||
|                 avatar_url: db.users.avatar_url(&sender_id)?, |             ruma::events::presence::PresenceEvent { | ||||||
|                 currently_active: None, |                 content: ruma::events::presence::PresenceEventContent { | ||||||
|                 displayname: db.users.displayname(&sender_id)?, |                     avatar_url: db.users.avatar_url(&sender_id)?, | ||||||
|                 last_active_ago: Some( |                     currently_active: None, | ||||||
|                     utils::millis_since_unix_epoch() |                     displayname: db.users.displayname(&sender_id)?, | ||||||
|                         .try_into() |                     last_active_ago: Some( | ||||||
|                         .expect("time is valid"), |                         utils::millis_since_unix_epoch() | ||||||
|                 ), |                             .try_into() | ||||||
|                 presence: ruma::presence::PresenceState::Online, |                             .expect("time is valid"), | ||||||
|                 status_msg: None, |                     ), | ||||||
|  |                     presence: ruma::presence::PresenceState::Online, | ||||||
|  |                     status_msg: None, | ||||||
|  |                 }, | ||||||
|  |                 sender: sender_id.clone(), | ||||||
|             }, |             }, | ||||||
|             sender: sender_id.clone(), |             &db.globals, | ||||||
|         }, |         )?; | ||||||
|         &db.globals, |     } | ||||||
|     )?; |  | ||||||
| 
 | 
 | ||||||
|     Ok(set_display_name::Response.into()) |     Ok(set_display_name::Response.into()) | ||||||
| } | } | ||||||
|  | @ -739,7 +741,7 @@ pub fn set_avatar_url_route( | ||||||
|     db.users |     db.users | ||||||
|         .set_avatar_url(&sender_id, body.avatar_url.clone())?; |         .set_avatar_url(&sender_id, body.avatar_url.clone())?; | ||||||
| 
 | 
 | ||||||
|     // Send a new membership event into all joined rooms
 |     // Send a new membership event and presence update into all joined rooms
 | ||||||
|     for room_id in db.rooms.rooms_joined(&sender_id) { |     for room_id in db.rooms.rooms_joined(&sender_id) { | ||||||
|         let room_id = room_id?; |         let room_id = room_id?; | ||||||
|         db.rooms.append_pdu( |         db.rooms.append_pdu( | ||||||
|  | @ -769,27 +771,29 @@ pub fn set_avatar_url_route( | ||||||
|             None, |             None, | ||||||
|             &db.globals, |             &db.globals, | ||||||
|         )?; |         )?; | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     // Presence update
 |         // Presence update
 | ||||||
|     db.global_edus.update_presence( |         db.rooms.edus.update_presence( | ||||||
|         ruma::events::presence::PresenceEvent { |             &sender_id, | ||||||
|             content: ruma::events::presence::PresenceEventContent { |             &room_id, | ||||||
|                 avatar_url: db.users.avatar_url(&sender_id)?, |             ruma::events::presence::PresenceEvent { | ||||||
|                 currently_active: None, |                 content: ruma::events::presence::PresenceEventContent { | ||||||
|                 displayname: db.users.displayname(&sender_id)?, |                     avatar_url: db.users.avatar_url(&sender_id)?, | ||||||
|                 last_active_ago: Some( |                     currently_active: None, | ||||||
|                     utils::millis_since_unix_epoch() |                     displayname: db.users.displayname(&sender_id)?, | ||||||
|                         .try_into() |                     last_active_ago: Some( | ||||||
|                         .expect("time is valid"), |                         utils::millis_since_unix_epoch() | ||||||
|                 ), |                             .try_into() | ||||||
|                 presence: ruma::presence::PresenceState::Online, |                             .expect("time is valid"), | ||||||
|                 status_msg: None, |                     ), | ||||||
|  |                     presence: ruma::presence::PresenceState::Online, | ||||||
|  |                     status_msg: None, | ||||||
|  |                 }, | ||||||
|  |                 sender: sender_id.clone(), | ||||||
|             }, |             }, | ||||||
|             sender: sender_id.clone(), |             &db.globals, | ||||||
|         }, |         )?; | ||||||
|         &db.globals, |     } | ||||||
|     )?; |  | ||||||
| 
 | 
 | ||||||
|     Ok(set_avatar_url::Response.into()) |     Ok(set_avatar_url::Response.into()) | ||||||
| } | } | ||||||
|  | @ -844,24 +848,30 @@ pub fn set_presence_route( | ||||||
| ) -> ConduitResult<set_presence::Response> { | ) -> ConduitResult<set_presence::Response> { | ||||||
|     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); |     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); | ||||||
| 
 | 
 | ||||||
|     db.global_edus.update_presence( |     for room_id in db.rooms.rooms_joined(&sender_id) { | ||||||
|         ruma::events::presence::PresenceEvent { |         let room_id = room_id?; | ||||||
|             content: ruma::events::presence::PresenceEventContent { | 
 | ||||||
|                 avatar_url: db.users.avatar_url(&sender_id)?, |         db.rooms.edus.update_presence( | ||||||
|                 currently_active: None, |             &sender_id, | ||||||
|                 displayname: db.users.displayname(&sender_id)?, |             &room_id, | ||||||
|                 last_active_ago: Some( |             ruma::events::presence::PresenceEvent { | ||||||
|                     utils::millis_since_unix_epoch() |                 content: ruma::events::presence::PresenceEventContent { | ||||||
|                         .try_into() |                     avatar_url: db.users.avatar_url(&sender_id)?, | ||||||
|                         .expect("time is valid"), |                     currently_active: None, | ||||||
|                 ), |                     displayname: db.users.displayname(&sender_id)?, | ||||||
|                 presence: body.presence, |                     last_active_ago: Some( | ||||||
|                 status_msg: body.status_msg.clone(), |                         utils::millis_since_unix_epoch() | ||||||
|  |                             .try_into() | ||||||
|  |                             .expect("time is valid"), | ||||||
|  |                     ), | ||||||
|  |                     presence: body.presence, | ||||||
|  |                     status_msg: body.status_msg.clone(), | ||||||
|  |                 }, | ||||||
|  |                 sender: sender_id.clone(), | ||||||
|             }, |             }, | ||||||
|             sender: sender_id.clone(), |             &db.globals, | ||||||
|         }, |         )?; | ||||||
|         &db.globals, |     } | ||||||
|     )?; |  | ||||||
| 
 | 
 | ||||||
|     Ok(set_presence::Response.into()) |     Ok(set_presence::Response.into()) | ||||||
| } | } | ||||||
|  | @ -2492,6 +2502,9 @@ pub async fn sync_events_route( | ||||||
|     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); |     let sender_id = body.sender_id.as_ref().expect("user is authenticated"); | ||||||
|     let device_id = body.device_id.as_ref().expect("user is authenticated"); |     let device_id = body.device_id.as_ref().expect("user is authenticated"); | ||||||
| 
 | 
 | ||||||
|  |     // TODO: match body.set_presence {
 | ||||||
|  |     db.rooms.edus.ping_presence(&sender_id)?; | ||||||
|  | 
 | ||||||
|     // Setup watchers, so if there's no response, we can wait for them
 |     // Setup watchers, so if there's no response, we can wait for them
 | ||||||
|     let watcher = db.watch(sender_id, device_id); |     let watcher = db.watch(sender_id, device_id); | ||||||
| 
 | 
 | ||||||
|  | @ -2504,6 +2517,8 @@ pub async fn sync_events_route( | ||||||
|         .and_then(|string| string.parse().ok()) |         .and_then(|string| string.parse().ok()) | ||||||
|         .unwrap_or(0); |         .unwrap_or(0); | ||||||
| 
 | 
 | ||||||
|  |     let mut presence_updates = HashMap::new(); | ||||||
|  | 
 | ||||||
|     for room_id in db.rooms.rooms_joined(&sender_id) { |     for room_id in db.rooms.rooms_joined(&sender_id) { | ||||||
|         let room_id = room_id?; |         let room_id = room_id?; | ||||||
| 
 | 
 | ||||||
|  | @ -2735,6 +2750,40 @@ pub async fn sync_events_route( | ||||||
|         if !joined_room.is_empty() { |         if !joined_room.is_empty() { | ||||||
|             joined_rooms.insert(room_id.clone(), joined_room); |             joined_rooms.insert(room_id.clone(), joined_room); | ||||||
|         } |         } | ||||||
|  | 
 | ||||||
|  |         // Take presence updates from this room
 | ||||||
|  |         for (user_id, presence) in | ||||||
|  |             db.rooms | ||||||
|  |                 .edus | ||||||
|  |                 .presence_since(&room_id, since, &db.rooms, &db.globals)? | ||||||
|  |         { | ||||||
|  |             match presence_updates.entry(user_id) { | ||||||
|  |                 hash_map::Entry::Vacant(v) => { | ||||||
|  |                     v.insert(presence); | ||||||
|  |                 } | ||||||
|  |                 hash_map::Entry::Occupied(mut o) => { | ||||||
|  |                     let p = o.get_mut(); | ||||||
|  | 
 | ||||||
|  |                     // Update existing presence event with more info
 | ||||||
|  |                     p.content.presence = presence.content.presence; | ||||||
|  |                     if let Some(status_msg) = presence.content.status_msg { | ||||||
|  |                         p.content.status_msg = Some(status_msg); | ||||||
|  |                     } | ||||||
|  |                     if let Some(last_active_ago) = presence.content.last_active_ago { | ||||||
|  |                         p.content.last_active_ago = Some(last_active_ago); | ||||||
|  |                     } | ||||||
|  |                     if let Some(displayname) = presence.content.displayname { | ||||||
|  |                         p.content.displayname = Some(displayname); | ||||||
|  |                     } | ||||||
|  |                     if let Some(avatar_url) = presence.content.avatar_url { | ||||||
|  |                         p.content.avatar_url = Some(avatar_url); | ||||||
|  |                     } | ||||||
|  |                     if let Some(currently_active) = presence.content.currently_active { | ||||||
|  |                         p.content.currently_active = Some(currently_active); | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     let mut left_rooms = BTreeMap::new(); |     let mut left_rooms = BTreeMap::new(); | ||||||
|  | @ -2818,23 +2867,9 @@ pub async fn sync_events_route( | ||||||
|             invite: invited_rooms, |             invite: invited_rooms, | ||||||
|         }, |         }, | ||||||
|         presence: sync_events::Presence { |         presence: sync_events::Presence { | ||||||
|             events: db |             events: presence_updates | ||||||
|                 .global_edus |                 .into_iter() | ||||||
|                 .presence_since(since)? |                 .map(|(_, v)| Raw::from(v)) | ||||||
|                 .map(|edu| { |  | ||||||
|                     let mut edu = edu? |  | ||||||
|                         .deserialize() |  | ||||||
|                         .map_err(|_| Error::bad_database("EDU in database is invalid."))?; |  | ||||||
|                     if let Some(timestamp) = edu.content.last_active_ago { |  | ||||||
|                         let mut last_active_ago = utils::millis_since_unix_epoch() |  | ||||||
|                             .try_into() |  | ||||||
|                             .expect("time is valid"); |  | ||||||
|                         last_active_ago -= timestamp; |  | ||||||
|                         edu.content.last_active_ago = Some(last_active_ago); |  | ||||||
|                     } |  | ||||||
|                     Ok::<_, Error>(edu.into()) |  | ||||||
|                 }) |  | ||||||
|                 .filter_map(|edu| edu.ok()) // Filter out buggy events
 |  | ||||||
|                 .collect(), |                 .collect(), | ||||||
|         }, |         }, | ||||||
|         account_data: sync_events::AccountData { |         account_data: sync_events::AccountData { | ||||||
|  | @ -2878,8 +2913,8 @@ pub async fn sync_events_route( | ||||||
|         // Hang a few seconds so requests are not spammed
 |         // Hang a few seconds so requests are not spammed
 | ||||||
|         // Stop hanging if new info arrives
 |         // Stop hanging if new info arrives
 | ||||||
|         let mut duration = body.timeout.unwrap_or(Duration::default()); |         let mut duration = body.timeout.unwrap_or(Duration::default()); | ||||||
|         if duration.as_secs() > 10 { |         if duration.as_secs() > 30 { | ||||||
|             duration = Duration::from_secs(10); |             duration = Duration::from_secs(30); | ||||||
|         } |         } | ||||||
|         let mut delay = tokio::time::delay_for(duration); |         let mut delay = tokio::time::delay_for(duration); | ||||||
|         tokio::select! { |         tokio::select! { | ||||||
|  |  | ||||||
|  | @ -1,5 +1,4 @@ | ||||||
| pub(self) mod account_data; | pub(self) mod account_data; | ||||||
| pub(self) mod global_edus; |  | ||||||
| pub(self) mod globals; | pub(self) mod globals; | ||||||
| pub(self) mod key_backups; | pub(self) mod key_backups; | ||||||
| pub(self) mod media; | pub(self) mod media; | ||||||
|  | @ -22,7 +21,6 @@ pub struct Database { | ||||||
|     pub uiaa: uiaa::Uiaa, |     pub uiaa: uiaa::Uiaa, | ||||||
|     pub rooms: rooms::Rooms, |     pub rooms: rooms::Rooms, | ||||||
|     pub account_data: account_data::AccountData, |     pub account_data: account_data::AccountData, | ||||||
|     pub global_edus: global_edus::GlobalEdus, |  | ||||||
|     pub media: media::Media, |     pub media: media::Media, | ||||||
|     pub key_backups: key_backups::KeyBackups, |     pub key_backups: key_backups::KeyBackups, | ||||||
|     pub _db: sled::Db, |     pub _db: sled::Db, | ||||||
|  | @ -93,6 +91,8 @@ impl Database { | ||||||
|                     roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest")?, // Read receipts
 |                     roomlatestid_roomlatest: db.open_tree("roomlatestid_roomlatest")?, // Read receipts
 | ||||||
|                     roomactiveid_userid: db.open_tree("roomactiveid_userid")?, // Typing notifs
 |                     roomactiveid_userid: db.open_tree("roomactiveid_userid")?, // Typing notifs
 | ||||||
|                     roomid_lastroomactiveupdate: db.open_tree("roomid_lastroomactiveupdate")?, |                     roomid_lastroomactiveupdate: db.open_tree("roomid_lastroomactiveupdate")?, | ||||||
|  |                     presenceid_presence: db.open_tree("presenceid_presence")?, | ||||||
|  |                     userid_lastpresenceupdate: db.open_tree("userid_lastpresenceupdate")?, | ||||||
|                 }, |                 }, | ||||||
|                 pduid_pdu: db.open_tree("pduid_pdu")?, |                 pduid_pdu: db.open_tree("pduid_pdu")?, | ||||||
|                 eventid_pduid: db.open_tree("eventid_pduid")?, |                 eventid_pduid: db.open_tree("eventid_pduid")?, | ||||||
|  | @ -112,9 +112,6 @@ impl Database { | ||||||
|             account_data: account_data::AccountData { |             account_data: account_data::AccountData { | ||||||
|                 roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, |                 roomuserdataid_accountdata: db.open_tree("roomuserdataid_accountdata")?, | ||||||
|             }, |             }, | ||||||
|             global_edus: global_edus::GlobalEdus { |  | ||||||
|                 presenceid_presence: db.open_tree("presenceid_presence")?, // Presence
 |  | ||||||
|             }, |  | ||||||
|             media: media::Media { |             media: media::Media { | ||||||
|                 mediaid_file: db.open_tree("mediaid_file")?, |                 mediaid_file: db.open_tree("mediaid_file")?, | ||||||
|             }, |             }, | ||||||
|  | @ -146,9 +143,6 @@ impl Database { | ||||||
|                 .watch_prefix(&userdeviceid_prefix), |                 .watch_prefix(&userdeviceid_prefix), | ||||||
|         ); |         ); | ||||||
| 
 | 
 | ||||||
|         // TODO: only send for user they share a room with
 |  | ||||||
|         futures.push(self.global_edus.presenceid_presence.watch_prefix(b"")); |  | ||||||
| 
 |  | ||||||
|         futures.push(self.rooms.userroomid_joined.watch_prefix(&userid_prefix)); |         futures.push(self.rooms.userroomid_joined.watch_prefix(&userid_prefix)); | ||||||
|         futures.push(self.rooms.userroomid_invited.watch_prefix(&userid_prefix)); |         futures.push(self.rooms.userroomid_invited.watch_prefix(&userid_prefix)); | ||||||
|         futures.push(self.rooms.userroomid_left.watch_prefix(&userid_prefix)); |         futures.push(self.rooms.userroomid_left.watch_prefix(&userid_prefix)); | ||||||
|  |  | ||||||
|  | @ -1,62 +0,0 @@ | ||||||
| use crate::{Error, Result}; |  | ||||||
| use ruma::Raw; |  | ||||||
| 
 |  | ||||||
| pub struct GlobalEdus { |  | ||||||
|     //pub globalallid_globalall: sled::Tree, // ToDevice, GlobalAllId = UserId + Count
 |  | ||||||
|     pub(super) presenceid_presence: sled::Tree, // Presence, PresenceId = Count + UserId
 |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| impl GlobalEdus { |  | ||||||
|     /// Adds a global event which will be saved until a new event replaces it (e.g. presence updates).
 |  | ||||||
|     pub fn update_presence( |  | ||||||
|         &self, |  | ||||||
|         presence: ruma::events::presence::PresenceEvent, |  | ||||||
|         globals: &super::globals::Globals, |  | ||||||
|     ) -> Result<()> { |  | ||||||
|         // Remove old entry
 |  | ||||||
|         if let Some(old) = self |  | ||||||
|             .presenceid_presence |  | ||||||
|             .iter() |  | ||||||
|             .keys() |  | ||||||
|             .rev() |  | ||||||
|             .filter_map(|r| r.ok()) |  | ||||||
|             .find(|key| { |  | ||||||
|                 key.rsplit(|&b| b == 0xff) |  | ||||||
|                     .next() |  | ||||||
|                     .expect("rsplit always returns an element") |  | ||||||
|                     == presence.sender.to_string().as_bytes() |  | ||||||
|             }) |  | ||||||
|         { |  | ||||||
|             // This is the old global_latest
 |  | ||||||
|             self.presenceid_presence.remove(old)?; |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         let mut presence_id = globals.next_count()?.to_be_bytes().to_vec(); |  | ||||||
|         presence_id.push(0xff); |  | ||||||
|         presence_id.extend_from_slice(&presence.sender.to_string().as_bytes()); |  | ||||||
| 
 |  | ||||||
|         self.presenceid_presence.insert( |  | ||||||
|             presence_id, |  | ||||||
|             &*serde_json::to_string(&presence).expect("PresenceEvent can be serialized"), |  | ||||||
|         )?; |  | ||||||
| 
 |  | ||||||
|         Ok(()) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /// Returns an iterator over the most recent presence updates that happened after the event with id `since`.
 |  | ||||||
|     pub fn presence_since( |  | ||||||
|         &self, |  | ||||||
|         since: u64, |  | ||||||
|     ) -> Result<impl Iterator<Item = Result<Raw<ruma::events::presence::PresenceEvent>>>> { |  | ||||||
|         let first_possible_edu = (since + 1).to_be_bytes().to_vec(); // +1 so we don't send the event at since
 |  | ||||||
| 
 |  | ||||||
|         Ok(self |  | ||||||
|             .presenceid_presence |  | ||||||
|             .range(&*first_possible_edu..) |  | ||||||
|             .filter_map(|r| r.ok()) |  | ||||||
|             .map(|(_, v)| { |  | ||||||
|                 Ok(serde_json::from_slice(&v) |  | ||||||
|                     .map_err(|_| Error::bad_database("Invalid presence event in db."))?) |  | ||||||
|             })) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  | @ -1,15 +1,25 @@ | ||||||
| use crate::{utils, Error, Result}; | use crate::{utils, Error, Result}; | ||||||
|  | use js_int::UInt; | ||||||
| use ruma::{ | use ruma::{ | ||||||
|     events::{AnyEvent as EduEvent, SyncEphemeralRoomEvent}, |     events::{ | ||||||
|  |         presence::{PresenceEvent, PresenceEventContent}, | ||||||
|  |         AnyEvent as EduEvent, SyncEphemeralRoomEvent, | ||||||
|  |     }, | ||||||
|  |     presence::PresenceState, | ||||||
|     Raw, RoomId, UserId, |     Raw, RoomId, UserId, | ||||||
| }; | }; | ||||||
| use std::convert::TryFrom; | use std::{ | ||||||
|  |     collections::HashMap, | ||||||
|  |     convert::{TryFrom, TryInto}, | ||||||
|  | }; | ||||||
| 
 | 
 | ||||||
| pub struct RoomEdus { | pub struct RoomEdus { | ||||||
|     pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User
 |     pub(in super::super) roomuserid_lastread: sled::Tree, // RoomUserId = Room + User
 | ||||||
|     pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId
 |     pub(in super::super) roomlatestid_roomlatest: sled::Tree, // Read Receipts, RoomLatestId = RoomId + Count + UserId
 | ||||||
|     pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count
 |     pub(in super::super) roomactiveid_userid: sled::Tree, // Typing, RoomActiveId = RoomId + TimeoutTime + Count
 | ||||||
|     pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count
 |     pub(in super::super) roomid_lastroomactiveupdate: sled::Tree, // LastRoomActiveUpdate = Count
 | ||||||
|  |     pub(in super::super) presenceid_presence: sled::Tree, // PresenceId = RoomId + Count + UserId
 | ||||||
|  |     pub(in super::super) userid_lastpresenceupdate: sled::Tree, // LastPresenceUpdate = Count
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl RoomEdus { | impl RoomEdus { | ||||||
|  | @ -263,4 +273,181 @@ impl RoomEdus { | ||||||
|             })?)) |             })?)) | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     /// Adds a presence event which will be saved until a new event replaces it.
 | ||||||
|  |     ///
 | ||||||
|  |     /// Note: This method takes a RoomId because presence updates are always bound to rooms to
 | ||||||
|  |     /// make sure users outside these rooms can't see them.
 | ||||||
|  |     pub fn update_presence( | ||||||
|  |         &self, | ||||||
|  |         user_id: &UserId, | ||||||
|  |         room_id: &RoomId, | ||||||
|  |         presence: ruma::events::presence::PresenceEvent, | ||||||
|  |         globals: &super::super::globals::Globals, | ||||||
|  |     ) -> Result<()> { | ||||||
|  |         // TODO: Remove old entry? Or maybe just wipe completely from time to time?
 | ||||||
|  | 
 | ||||||
|  |         let count = globals.next_count()?.to_be_bytes(); | ||||||
|  | 
 | ||||||
|  |         let mut presence_id = room_id.to_string().as_bytes().to_vec(); | ||||||
|  |         presence_id.push(0xff); | ||||||
|  |         presence_id.extend_from_slice(&count); | ||||||
|  |         presence_id.push(0xff); | ||||||
|  |         presence_id.extend_from_slice(&presence.sender.to_string().as_bytes()); | ||||||
|  | 
 | ||||||
|  |         self.presenceid_presence.insert( | ||||||
|  |             presence_id, | ||||||
|  |             &*serde_json::to_string(&presence).expect("PresenceEvent can be serialized"), | ||||||
|  |         )?; | ||||||
|  | 
 | ||||||
|  |         self.userid_lastpresenceupdate.insert( | ||||||
|  |             &user_id.to_string().as_bytes(), | ||||||
|  |             &utils::millis_since_unix_epoch().to_be_bytes(), | ||||||
|  |         )?; | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Resets the presence timeout, so the user will stay in their current presence state.
 | ||||||
|  |     pub fn ping_presence(&self, user_id: &UserId) -> Result<()> { | ||||||
|  |         self.userid_lastpresenceupdate.insert( | ||||||
|  |             &user_id.to_string().as_bytes(), | ||||||
|  |             &utils::millis_since_unix_epoch().to_be_bytes(), | ||||||
|  |         )?; | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns the timestamp of the last presence update of this user in millis since the unix epoch.
 | ||||||
|  |     pub fn last_presence_update(&self, user_id: &UserId) -> Result<Option<u64>> { | ||||||
|  |         self.userid_lastpresenceupdate | ||||||
|  |             .get(&user_id.to_string().as_bytes())? | ||||||
|  |             .map(|bytes| { | ||||||
|  |                 utils::u64_from_bytes(&bytes).map_err(|_| { | ||||||
|  |                     Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") | ||||||
|  |                 }) | ||||||
|  |             }) | ||||||
|  |             .transpose() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Sets all users to offline who have been quiet for too long.
 | ||||||
|  |     pub fn presence_maintain( | ||||||
|  |         &self, | ||||||
|  |         rooms: &super::Rooms, | ||||||
|  |         globals: &super::super::globals::Globals, | ||||||
|  |     ) -> Result<()> { | ||||||
|  |         let current_timestamp = utils::millis_since_unix_epoch(); | ||||||
|  | 
 | ||||||
|  |         for (user_id_bytes, last_timestamp) in self | ||||||
|  |             .userid_lastpresenceupdate | ||||||
|  |             .iter() | ||||||
|  |             .filter_map(|r| r.ok()) | ||||||
|  |             .filter_map(|(k, bytes)| { | ||||||
|  |                 Some(( | ||||||
|  |                     k, | ||||||
|  |                     utils::u64_from_bytes(&bytes) | ||||||
|  |                         .map_err(|_| { | ||||||
|  |                             Error::bad_database("Invalid timestamp in userid_lastpresenceupdate.") | ||||||
|  |                         }) | ||||||
|  |                         .ok()?, | ||||||
|  |                 )) | ||||||
|  |             }) | ||||||
|  |             .take_while(|(_, timestamp)| current_timestamp - timestamp > 5 * 60_000) // 5 Minutes
 | ||||||
|  |         { | ||||||
|  |             self.userid_lastpresenceupdate.remove(&user_id_bytes)?; | ||||||
|  | 
 | ||||||
|  |             // Send new presence events to set the user offline
 | ||||||
|  |             let count = globals.next_count()?.to_be_bytes(); | ||||||
|  |             let user_id = utils::string_from_bytes(&user_id_bytes) | ||||||
|  |                 .map_err(|_| { | ||||||
|  |                     Error::bad_database("Invalid UserId bytes in userid_lastpresenceupdate.") | ||||||
|  |                 })? | ||||||
|  |                 .try_into() | ||||||
|  |                 .map_err(|_| Error::bad_database("Invalid UserId in userid_lastpresenceupdate."))?; | ||||||
|  |             for room_id in rooms.rooms_joined(&user_id).filter_map(|r| r.ok()) { | ||||||
|  |                 let mut presence_id = room_id.to_string().as_bytes().to_vec(); | ||||||
|  |                 presence_id.push(0xff); | ||||||
|  |                 presence_id.extend_from_slice(&count); | ||||||
|  |                 presence_id.push(0xff); | ||||||
|  |                 presence_id.extend_from_slice(&user_id_bytes); | ||||||
|  | 
 | ||||||
|  |                 self.presenceid_presence.insert( | ||||||
|  |                     presence_id, | ||||||
|  |                     &*serde_json::to_string(&PresenceEvent { | ||||||
|  |                         content: PresenceEventContent { | ||||||
|  |                             avatar_url: None, | ||||||
|  |                             currently_active: None, | ||||||
|  |                             displayname: None, | ||||||
|  |                             last_active_ago: Some( | ||||||
|  |                                 last_timestamp.try_into().expect("time is valid"), | ||||||
|  |                             ), | ||||||
|  |                             presence: PresenceState::Offline, | ||||||
|  |                             status_msg: None, | ||||||
|  |                         }, | ||||||
|  |                         sender: user_id.clone(), | ||||||
|  |                     }) | ||||||
|  |                     .expect("PresenceEvent can be serialized"), | ||||||
|  |                 )?; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns an iterator over the most recent presence updates that happened after the event with id `since`.
 | ||||||
|  |     pub fn presence_since( | ||||||
|  |         &self, | ||||||
|  |         room_id: &RoomId, | ||||||
|  |         since: u64, | ||||||
|  |         rooms: &super::Rooms, | ||||||
|  |         globals: &super::super::globals::Globals, | ||||||
|  |     ) -> Result<HashMap<UserId, PresenceEvent>> { | ||||||
|  |         self.presence_maintain(rooms, globals)?; | ||||||
|  | 
 | ||||||
|  |         let mut prefix = room_id.to_string().as_bytes().to_vec(); | ||||||
|  |         prefix.push(0xff); | ||||||
|  | 
 | ||||||
|  |         let mut first_possible_edu = prefix.clone(); | ||||||
|  |         first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
 | ||||||
|  |         let mut hashmap = HashMap::new(); | ||||||
|  | 
 | ||||||
|  |         for (key, value) in self | ||||||
|  |             .presenceid_presence | ||||||
|  |             .range(&*first_possible_edu..) | ||||||
|  |             .filter_map(|r| r.ok()) | ||||||
|  |             .take_while(|(key, _)| key.starts_with(&prefix)) | ||||||
|  |         { | ||||||
|  |             let user_id = UserId::try_from( | ||||||
|  |                 utils::string_from_bytes( | ||||||
|  |                     key.rsplit(|&b| b == 0xff) | ||||||
|  |                         .next() | ||||||
|  |                         .expect("rsplit always returns an element"), | ||||||
|  |                 ) | ||||||
|  |                 .map_err(|_| Error::bad_database("Invalid UserId bytes in presenceid_presence."))?, | ||||||
|  |             ) | ||||||
|  |             .map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?; | ||||||
|  | 
 | ||||||
|  |             let mut presence = serde_json::from_slice::<PresenceEvent>(&value) | ||||||
|  |                 .map_err(|_| Error::bad_database("Invalid presence event in db."))?; | ||||||
|  | 
 | ||||||
|  |             let current_timestamp: UInt = utils::millis_since_unix_epoch() | ||||||
|  |                 .try_into() | ||||||
|  |                 .expect("time is valid"); | ||||||
|  | 
 | ||||||
|  |             if presence.content.presence == PresenceState::Online { | ||||||
|  |                 // Don't set last_active_ago when the user is online
 | ||||||
|  |                 presence.content.last_active_ago = None; | ||||||
|  |             } else { | ||||||
|  |                 // Convert from timestamp to duration
 | ||||||
|  |                 presence.content.last_active_ago = presence | ||||||
|  |                     .content | ||||||
|  |                     .last_active_ago | ||||||
|  |                     .map(|timestamp| current_timestamp - timestamp); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             hashmap.insert(user_id, presence); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         Ok(hashmap) | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue