use crate::{utils, Database, PduEvent}; use ruma_events::{collections::only::Event as EduEvent, EventResult, EventType}; use ruma_federation_api::RoomV3Pdu; use ruma_identifiers::{EventId, RoomId, UserId}; use serde_json::json; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, }; pub struct Data { hostname: String, db: Database, } impl Data { /// Load an existing database or create a new one. pub fn load_or_create(hostname: &str) -> Self { Self { hostname: hostname.to_owned(), db: Database::load_or_create(hostname), } } /// Get the hostname of the server. pub fn hostname(&self) -> &str { &self.hostname } /// Check if a user has an account by looking for an assigned password. pub fn user_exists(&self, user_id: &UserId) -> bool { self.db .userid_password .contains_key(user_id.to_string()) .unwrap() } /// Create a new user account by assigning them a password. pub fn user_add(&self, user_id: &UserId, password: Option) { self.db .userid_password .insert(user_id.to_string(), &*password.unwrap_or_default()) .unwrap(); } /// Find out which user an access token belongs to. pub fn user_from_token(&self, token: &str) -> Option { self.db .token_userid .get(token) .unwrap() .and_then(|bytes| (*utils::string_from_bytes(&bytes)).try_into().ok()) } pub fn users_all(&self) -> Vec { self.db .userid_password .iter() .keys() .map(|k| UserId::try_from(&*utils::string_from_bytes(&k.unwrap())).unwrap()) .collect() } /// Checks if the given password is equal to the one in the database. pub fn password_get(&self, user_id: &UserId) -> Option { self.db .userid_password .get(user_id.to_string()) .unwrap() .map(|bytes| utils::string_from_bytes(&bytes)) } /// Removes a displayname. pub fn displayname_remove(&self, user_id: &UserId) { self.db .userid_displayname .remove(user_id.to_string()) .unwrap(); } /// Set a new displayname. pub fn displayname_set(&self, user_id: &UserId, displayname: Option) { self.db .userid_displayname .insert(user_id.to_string(), &*displayname.unwrap_or_default()) .unwrap(); } /// Get a the displayname of a user. pub fn displayname_get(&self, user_id: &UserId) -> Option { self.db .userid_displayname .get(user_id.to_string()) .unwrap() .map(|bytes| utils::string_from_bytes(&bytes)) } /// Removes a avatar_url. pub fn avatar_url_remove(&self, user_id: &UserId) { self.db .userid_avatarurl .remove(user_id.to_string()) .unwrap(); } /// Set a new avatar_url. pub fn avatar_url_set(&self, user_id: &UserId, avatar_url: String) { self.db .userid_avatarurl .insert(user_id.to_string(), &*avatar_url) .unwrap(); } /// Get a the avatar_url of a user. pub fn avatar_url_get(&self, user_id: &UserId) -> Option { self.db .userid_avatarurl .get(user_id.to_string()) .unwrap() .map(|bytes| utils::string_from_bytes(&bytes)) } /// Add a new device to a user. pub fn device_add(&self, user_id: &UserId, device_id: &str) { if self .db .userid_deviceids .get_iter(&user_id.to_string().as_bytes()) .filter_map(|item| item.ok()) .map(|(_key, value)| value) .all(|device| device != device_id) { self.db .userid_deviceids .add(user_id.to_string().as_bytes(), device_id.into()); } } /// Replace the access token of one device. pub fn token_replace(&self, user_id: &UserId, device_id: &String, token: String) { // Make sure the device id belongs to the user debug_assert!(self .db .userid_deviceids .get_iter(&user_id.to_string().as_bytes()) .filter_map(|item| item.ok()) .map(|(_key, value)| value) .any(|device| device == device_id.as_bytes())); // Does the user have that device? // Remove old token let mut key = user_id.to_string().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(device_id.as_bytes()); if let Some(old_token) = self.db.userdeviceid_token.get(&key).unwrap() { self.db.token_userid.remove(old_token).unwrap(); // It will be removed from deviceid_token by the insert later } // Assign token to device_id self.db.userdeviceid_token.insert(key, &*token).unwrap(); // Assign token to user self.db .token_userid .insert(token, &*user_id.to_string()) .unwrap(); } pub fn room_join(&self, room_id: &RoomId, user_id: &UserId) -> bool { if !self.room_exists(room_id) { return false; } self.db.userid_roomids.add( user_id.to_string().as_bytes(), room_id.to_string().as_bytes().into(), ); self.db.roomid_userids.add( room_id.to_string().as_bytes(), user_id.to_string().as_bytes().into(), ); self.db.userid_inviteroomids.remove_value( user_id.to_string().as_bytes(), room_id.to_string().as_bytes(), ); self.pdu_append( room_id.clone(), user_id.clone(), EventType::RoomMember, json!({"membership": "join"}), None, Some(user_id.to_string()), ); true } pub fn rooms_joined(&self, user_id: &UserId) -> Vec { self.db .userid_roomids .get_iter(user_id.to_string().as_bytes()) .values() .map(|room_id| { RoomId::try_from(&*utils::string_from_bytes(&room_id.unwrap())) .expect("user joined valid room ids") }) .collect() } /// Check if a room exists by looking for PDUs in that room. pub fn room_exists(&self, room_id: &RoomId) -> bool { // Create the first part of the full pdu id let mut prefix = vec![b'd']; prefix.extend_from_slice(room_id.to_string().as_bytes()); prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id if let Some((key, _)) = self.db.pduid_pdu.get_gt(&prefix).unwrap() { if key.starts_with(&prefix) { true } else { false } } else { false } } pub fn rooms_all(&self) -> Vec { let mut room_ids = self .db .roomid_pduleaves .iter_all() .keys() .map(|key| { RoomId::try_from(&*utils::string_from_bytes( &key.unwrap() .iter() .skip(1) // skip "d" .copied() .take_while(|&x| x != 0xff) // until delimiter .collect::>(), )) .unwrap() }) .collect::>(); room_ids.dedup(); room_ids } pub fn room_users(&self, room_id: &RoomId) -> u32 { self.db .roomid_userids .get_iter(room_id.to_string().as_bytes()) .count() as u32 } pub fn room_state(&self, room_id: &RoomId) -> HashMap<(EventType, String), PduEvent> { let mut hashmap = HashMap::new(); for pdu in self .db .roomstateid_pdu .scan_prefix(&room_id.to_string().as_bytes()) .values() .map(|value| serde_json::from_slice::(&value.unwrap()).unwrap()) { hashmap.insert( ( pdu.kind.clone(), pdu.state_key .clone() .expect("state events have a state key"), ), pdu, ); } hashmap } pub fn room_invite(&self, sender: &UserId, room_id: &RoomId, user_id: &UserId) { self.pdu_append( room_id.clone(), sender.clone(), EventType::RoomMember, json!({"membership": "invite"}), None, Some(user_id.to_string()), ); self.db.userid_inviteroomids.add( &user_id.to_string().as_bytes(), room_id.to_string().as_bytes().into(), ); } pub fn rooms_invited(&self, user_id: &UserId) -> Vec { self.db .userid_inviteroomids .get_iter(&user_id.to_string().as_bytes()) .values() .map(|key| RoomId::try_from(&*utils::string_from_bytes(&key.unwrap())).unwrap()) .collect() } pub fn pdu_get(&self, event_id: &EventId) -> Option { self.db .eventid_pduid .get(event_id.to_string().as_bytes()) .unwrap() .map(|pdu_id| { serde_json::from_slice( &self .db .pduid_pdu .get(pdu_id) .unwrap() .expect("eventid_pduid in db is valid"), ) .expect("pdu is valid") }) } pub fn pdu_leaves_get(&self, room_id: &RoomId) -> Vec { let event_ids = self .db .roomid_pduleaves .get_iter(room_id.to_string().as_bytes()) .values() .map(|pdu_id| { EventId::try_from(&*utils::string_from_bytes(&pdu_id.unwrap())) .expect("pdu leaves are valid event ids") }) .collect(); event_ids } pub fn pdu_leaves_replace(&self, room_id: &RoomId, event_id: &EventId) { self.db .roomid_pduleaves .clear(room_id.to_string().as_bytes()); self.db.roomid_pduleaves.add( &room_id.to_string().as_bytes(), (*event_id.to_string()).into(), ); } /// Add a persisted data unit from this homeserver pub fn pdu_append( &self, room_id: RoomId, sender: UserId, event_type: EventType, content: serde_json::Value, unsigned: Option>, state_key: Option, ) -> EventId { // prev_events are the leaves of the current graph. This method removes all leaves from the // room and replaces them with our event // TODO: Make sure this isn't called twice in parallel let prev_events = self.pdu_leaves_get(&room_id); // Our depth is the maximum depth of prev_events + 1 let depth = prev_events .iter() .map(|event_id| { self.pdu_get(event_id) .expect("pdu in prev_events is valid") .depth .into() }) .max() .unwrap_or(0_u64) + 1; let mut pdu = PduEvent { event_id: EventId::try_from("$thiswillbefilledinlater").unwrap(), room_id: room_id.clone(), sender: sender.clone(), origin: self.hostname.clone(), origin_server_ts: utils::millis_since_unix_epoch().try_into().unwrap(), kind: event_type, content, state_key, prev_events, depth: depth.try_into().unwrap(), auth_events: Vec::new(), redacts: None, unsigned: unsigned.unwrap_or_default(), hashes: ruma_federation_api::EventHash { sha256: "aaa".to_owned(), }, signatures: HashMap::new(), }; // Generate event id pdu.event_id = EventId::try_from(&*format!( "${}", ruma_signatures::reference_hash(&serde_json::to_value(&pdu).unwrap()) .expect("ruma can calculate reference hashes") )) .expect("ruma's reference hashes are correct"); self.pdu_leaves_replace(&room_id, &pdu.event_id); // The new value will need a new index. We store the last used index in 'n' // The count will go up regardless of the room_id // This is also the next_batch/since value // Increment the last index and use that let index = utils::u64_from_bytes( &self .db .pduid_pdu .update_and_fetch(b"n", utils::increment) .unwrap() .unwrap(), ); let mut pdu_id = vec![b'd']; pdu_id.extend_from_slice(room_id.to_string().as_bytes()); pdu_id.push(0xff); // Add delimiter so we don't find rooms starting with the same id pdu_id.extend_from_slice(&index.to_be_bytes()); let pdu_json = serde_json::to_string(&pdu).unwrap(); self.db.pduid_pdu.insert(&pdu_id, &*pdu_json).unwrap(); self.db .eventid_pduid .insert(pdu.event_id.to_string(), pdu_id.clone()) .unwrap(); if let Some(state_key) = pdu.state_key { let mut key = room_id.to_string().as_bytes().to_vec(); key.push(0xff); key.extend_from_slice(dbg!(pdu.kind.to_string().as_bytes())); key.push(0xff); key.extend_from_slice(state_key.to_string().as_bytes()); self.db.roomstateid_pdu.insert(key, &*pdu_json).unwrap(); } pdu.event_id } /// Returns a vector of all PDUs in a room. pub fn pdus_all(&self, room_id: &RoomId) -> Vec { self.pdus_since(room_id, 0) } pub fn last_pdu_index(&self) -> u64 { let count_key: Vec = vec![b'n']; utils::u64_from_bytes( &self .db .pduid_pdu .get(&count_key) .unwrap() .unwrap_or_else(|| (&0_u64.to_be_bytes()).into()), ) } /// Returns a vector of all events in a room that happened after the event with id `since`. pub fn pdus_since(&self, room_id: &RoomId, since: u64) -> Vec { let mut pdus = Vec::new(); // Create the first part of the full pdu id let mut prefix = vec![b'd']; prefix.extend_from_slice(room_id.to_string().as_bytes()); prefix.push(0xff); // Add delimiter so we don't find rooms starting with the same id let mut current = prefix.clone(); current.extend_from_slice(&since.to_be_bytes()); while let Some((key, value)) = self.db.pduid_pdu.get_gt(¤t).unwrap() { if key.starts_with(&prefix) { current = key.to_vec(); pdus.push(serde_json::from_slice(&value).expect("pdu in db is valid")); } else { break; } } pdus } pub fn roomlatest_update(&self, user_id: &UserId, room_id: &RoomId, event: EduEvent) { let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); // Start with last if let Some(mut current) = self .db .roomlatestid_roomlatest .scan_prefix(&prefix) .keys() .next_back() .map(|c| c.unwrap()) { // Remove old marker (There should at most one) loop { if !current.starts_with(&prefix) { // We're in another room break; } if current.rsplitn(2, |&b| b == 0xff).next().unwrap() == user_id.to_string().as_bytes() { // This is the old room_latest self.db.roomlatestid_roomlatest.remove(current).unwrap(); break; } // Else, try the event before that if let Some((k, _)) = self.db.roomlatestid_roomlatest.get_lt(current).unwrap() { current = k; } else { break; } } } // Increment the last index and use that let index = utils::u64_from_bytes( &self .db .pduid_pdu .update_and_fetch(b"n", utils::increment) .unwrap() .unwrap(), ); let mut room_latest_id = prefix; room_latest_id.extend_from_slice(&index.to_be_bytes()); room_latest_id.push(0xff); room_latest_id.extend_from_slice(&user_id.to_string().as_bytes()); self.db .roomlatestid_roomlatest .insert(room_latest_id, &*serde_json::to_string(&event).unwrap()) .unwrap(); } /// Returns a vector of the most recent read_receipts in a room that happened after the event with id `since`. pub fn roomlatests_since(&self, room_id: &RoomId, since: u64) -> Vec { let mut room_latests = Vec::new(); let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); let mut current = prefix.clone(); current.extend_from_slice(&since.to_be_bytes()); while let Some((key, value)) = self.db.roomlatestid_roomlatest.get_gt(¤t).unwrap() { if key.starts_with(&prefix) { current = key.to_vec(); room_latests.push( serde_json::from_slice::>(&value) .expect("room_latest in db is valid") .into_result() .expect("room_latest in db is valid"), ); } else { break; } } room_latests } pub fn roomactive_add(&self, event: EduEvent, room_id: &RoomId, timeout: u64) { let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); let mut current = prefix.clone(); while let Some((key, _)) = self.db.roomactiveid_roomactive.get_gt(¤t).unwrap() { if key.starts_with(&prefix) && utils::u64_from_bytes(key.split(|&c| c == 0xff).nth(1).unwrap()) > utils::millis_since_unix_epoch().try_into().unwrap() { current = key.to_vec(); self.db.roomactiveid_roomactive.remove(¤t).unwrap(); } else { break; } } // Increment the last index and use that let index = utils::u64_from_bytes( &self .db .pduid_pdu .update_and_fetch(b"n", utils::increment) .unwrap() .unwrap(), ); let mut room_active_id = prefix; room_active_id.extend_from_slice(&timeout.to_be_bytes()); room_active_id.push(0xff); room_active_id.extend_from_slice(&index.to_be_bytes()); self.db .roomactiveid_roomactive .insert(room_active_id, &*serde_json::to_string(&event).unwrap()) .unwrap(); } pub fn roomactive_remove(&self, event: EduEvent, room_id: &RoomId) { let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); let mut current = prefix.clone(); let json = serde_json::to_string(&event).unwrap(); while let Some((key, value)) = self.db.roomactiveid_roomactive.get_gt(¤t).unwrap() { if key.starts_with(&prefix) { current = key.to_vec(); if value == json.as_bytes() { self.db.roomactiveid_roomactive.remove(¤t).unwrap(); break; } } else { break; } } } /// Returns a vector of the most recent read_receipts in a room that happened after the event with id `since`. pub fn roomactives_in(&self, room_id: &RoomId) -> Vec { let mut room_actives = Vec::new(); let mut prefix = room_id.to_string().as_bytes().to_vec(); prefix.push(0xff); let mut current = prefix.clone(); current.extend_from_slice(&utils::millis_since_unix_epoch().to_be_bytes()); while let Some((key, value)) = self.db.roomactiveid_roomactive.get_gt(¤t).unwrap() { if key.starts_with(&prefix) { current = key.to_vec(); room_actives.push( serde_json::from_slice::>(&value) .expect("room_active in db is valid") .into_result() .expect("room_active in db is valid"), ); } else { break; } } if room_actives.is_empty() { return vec![EduEvent::Typing(ruma_events::typing::TypingEvent { content: ruma_events::typing::TypingEventContent { user_ids: Vec::new(), }, room_id: None, // None because it can be inferred })]; } else { room_actives } } pub fn debug(&self) { self.db.debug(); } }