diff --git a/src/database.rs b/src/database.rs index 7a17e53..e0f9eec 100644 --- a/src/database.rs +++ b/src/database.rs @@ -24,13 +24,14 @@ use rocket::{ request::{FromRequest, Request}, Shutdown, State, }; -use ruma::{DeviceId, RoomId, ServerName, UserId}; +use ruma::{DeviceId, EventId, RoomId, ServerName, UserId}; use serde::{de::IgnoredAny, Deserialize}; use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, convert::TryFrom, fs::{self, remove_dir_all}, io::Write, + mem::size_of, ops::Deref, path::Path, sync::{Arc, Mutex, RwLock}, @@ -261,7 +262,12 @@ impl Database { userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?, statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?, + + shortroomid_roomid: builder.open_tree("shortroomid_roomid")?, + roomid_shortroomid: builder.open_tree("roomid_shortroomid")?, + stateid_shorteventid: builder.open_tree("stateid_shorteventid")?, + shortstatehash_statediff: builder.open_tree("shortstatehash_statediff")?, eventid_shorteventid: builder.open_tree("eventid_shorteventid")?, shorteventid_eventid: builder.open_tree("shorteventid_eventid")?, shorteventid_shortstatehash: builder.open_tree("shorteventid_shortstatehash")?, @@ -438,6 +444,334 @@ impl Database { println!("Migration: 5 -> 6 finished"); } + + fn load_shortstatehash_info( + shortstatehash: &[u8], + db: &Database, + lru: &mut LruCache< + Vec, + Vec<( + Vec, + HashSet>, + HashSet>, + HashSet>, + )>, + >, + ) -> Result< + Vec<( + Vec, // sstatehash + HashSet>, // full state + HashSet>, // added + HashSet>, // removed + )>, + > { + if let Some(result) = lru.get_mut(shortstatehash) { + return Ok(result.clone()); + } + + let value = db + .rooms + .shortstatehash_statediff + .get(shortstatehash)? + .ok_or_else(|| Error::bad_database("State hash does not exist"))?; + let parent = value[0..size_of::()].to_vec(); + + let mut add_mode = true; + let mut added = HashSet::new(); + let mut removed = HashSet::new(); + + let mut i = size_of::(); + while let Some(v) = value.get(i..i + 2 * size_of::()) { + if add_mode && v.starts_with(&0_u64.to_be_bytes()) { + add_mode = false; + i += size_of::(); + continue; + } + if add_mode { + added.insert(v.to_vec()); + } else { + removed.insert(v.to_vec()); + } + i += 2 * size_of::(); + } + + if parent != 0_u64.to_be_bytes() { + let mut response = load_shortstatehash_info(&parent, db, lru)?; + let mut state = response.last().unwrap().1.clone(); + state.extend(added.iter().cloned()); + for r in &removed { + state.remove(r); + } + + response.push((shortstatehash.to_vec(), state, added, removed)); + + lru.insert(shortstatehash.to_vec(), response.clone()); + Ok(response) + } else { + let mut response = Vec::new(); + response.push((shortstatehash.to_vec(), added.clone(), added, removed)); + lru.insert(shortstatehash.to_vec(), response.clone()); + Ok(response) + } + } + + fn update_shortstatehash_level( + current_shortstatehash: &[u8], + statediffnew: HashSet>, + statediffremoved: HashSet>, + diff_to_sibling: usize, + mut parent_states: Vec<( + Vec, // sstatehash + HashSet>, // full state + HashSet>, // added + HashSet>, // removed + )>, + db: &Database, + ) -> Result<()> { + let diffsum = statediffnew.len() + statediffremoved.len(); + + if parent_states.len() > 3 { + // Number of layers + // To many layers, we have to go deeper + let parent = parent_states.pop().unwrap(); + + let mut parent_new = parent.2; + let mut parent_removed = parent.3; + + for removed in statediffremoved { + if !parent_new.remove(&removed) { + parent_removed.insert(removed); + } + } + parent_new.extend(statediffnew); + + update_shortstatehash_level( + current_shortstatehash, + parent_new, + parent_removed, + diffsum, + parent_states, + db, + )?; + + return Ok(()); + } + + if parent_states.len() == 0 { + // There is no parent layer, create a new state + let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent + for new in &statediffnew { + value.extend_from_slice(&new); + } + + if !statediffremoved.is_empty() { + warn!("Tried to create new state with removals"); + } + + db.rooms + .shortstatehash_statediff + .insert(¤t_shortstatehash, &value)?; + + return Ok(()); + }; + + // Else we have two options. + // 1. We add the current diff on top of the parent layer. + // 2. We replace a layer above + + let parent = parent_states.pop().unwrap(); + let parent_diff = parent.2.len() + parent.3.len(); + + if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff { + // Diff too big, we replace above layer(s) + let mut parent_new = parent.2; + let mut parent_removed = parent.3; + + for removed in statediffremoved { + if !parent_new.remove(&removed) { + parent_removed.insert(removed); + } + } + + parent_new.extend(statediffnew); + update_shortstatehash_level( + current_shortstatehash, + parent_new, + parent_removed, + diffsum, + parent_states, + db, + )?; + } else { + // Diff small enough, we add diff as layer on top of parent + let mut value = parent.0.clone(); + for new in &statediffnew { + value.extend_from_slice(&new); + } + + if !statediffremoved.is_empty() { + value.extend_from_slice(&0_u64.to_be_bytes()); + for removed in &statediffremoved { + value.extend_from_slice(&removed); + } + } + + db.rooms + .shortstatehash_statediff + .insert(¤t_shortstatehash, &value)?; + } + + Ok(()) + } + + if db.globals.database_version()? < 7 { + // Upgrade state store + let mut lru = LruCache::new(1000); + let mut last_roomstates: HashMap> = HashMap::new(); + let mut current_sstatehash: Vec = Vec::new(); + let mut current_room = None; + let mut current_state = HashSet::new(); + let mut counter = 0; + for (k, seventid) in db._db.open_tree("stateid_shorteventid")?.iter() { + let sstatehash = k[0..size_of::()].to_vec(); + let sstatekey = k[size_of::()..].to_vec(); + if sstatehash != current_sstatehash { + if !current_sstatehash.is_empty() { + counter += 1; + println!("counter: {}", counter); + let current_room = current_room.as_ref().unwrap(); + let last_roomsstatehash = last_roomstates.get(¤t_room); + + let states_parents = last_roomsstatehash.map_or_else( + || Ok(Vec::new()), + |last_roomsstatehash| { + load_shortstatehash_info(&last_roomsstatehash, &db, &mut lru) + }, + )?; + + let (statediffnew, statediffremoved) = + if let Some(parent_stateinfo) = states_parents.last() { + let statediffnew = current_state + .difference(&parent_stateinfo.1) + .cloned() + .collect::>(); + + let statediffremoved = parent_stateinfo + .1 + .difference(¤t_state) + .cloned() + .collect::>(); + + (statediffnew, statediffremoved) + } else { + (current_state, HashSet::new()) + }; + + update_shortstatehash_level( + ¤t_sstatehash, + statediffnew, + statediffremoved, + 2, // every state change is 2 event changes on average + states_parents, + &db, + )?; + + /* + let mut tmp = load_shortstatehash_info(¤t_sstatehash, &db)?; + let state = tmp.pop().unwrap(); + println!( + "{}\t{}{:?}: {:?} + {:?} - {:?}", + current_room, + " ".repeat(tmp.len()), + utils::u64_from_bytes(¤t_sstatehash).unwrap(), + tmp.last().map(|b| utils::u64_from_bytes(&b.0).unwrap()), + state + .2 + .iter() + .map(|b| utils::u64_from_bytes(&b[size_of::()..]).unwrap()) + .collect::>(), + state + .3 + .iter() + .map(|b| utils::u64_from_bytes(&b[size_of::()..]).unwrap()) + .collect::>() + ); + */ + + last_roomstates.insert(current_room.clone(), current_sstatehash); + } + current_state = HashSet::new(); + current_sstatehash = sstatehash; + + let event_id = db + .rooms + .shorteventid_eventid + .get(&seventid) + .unwrap() + .unwrap(); + let event_id = + EventId::try_from(utils::string_from_bytes(&event_id).unwrap()) + .unwrap(); + let pdu = db.rooms.get_pdu(&event_id).unwrap().unwrap(); + + if Some(&pdu.room_id) != current_room.as_ref() { + current_room = Some(pdu.room_id.clone()); + } + } + + let mut val = sstatekey; + val.extend_from_slice(&seventid); + current_state.insert(val); + } + + db.globals.bump_database_version(7)?; + + println!("Migration: 6 -> 7 finished"); + } + + if db.globals.database_version()? < 8 { + // Generate short room ids for all rooms + for (room_id, _) in db.rooms.roomid_shortstatehash.iter() { + let shortroomid = db.globals.next_count()?.to_be_bytes(); + db.rooms.roomid_shortroomid.insert(&room_id, &shortroomid)?; + db.rooms.shortroomid_roomid.insert(&shortroomid, &room_id)?; + } + // Update pduids db layout + for (key, v) in db.rooms.pduid_pdu.iter() { + let mut parts = key.splitn(2, |&b| b == 0xff); + let room_id = parts.next().unwrap(); + let count = parts.next().unwrap(); + + let short_room_id = db.rooms.roomid_shortroomid.get(&room_id)?.unwrap(); + + let mut new_key = short_room_id; + new_key.extend_from_slice(count); + + println!("{:?}", new_key); + } + + // Update tokenids db layout + for (key, _) in db.rooms.tokenids.iter() { + let mut parts = key.splitn(4, |&b| b == 0xff); + let room_id = parts.next().unwrap(); + let word = parts.next().unwrap(); + let _pdu_id_room = parts.next().unwrap(); + let pdu_id_count = parts.next().unwrap(); + + let short_room_id = db.rooms.roomid_shortroomid.get(&room_id)?.unwrap(); + let mut new_key = short_room_id; + new_key.extend_from_slice(word); + new_key.push(0xff); + new_key.extend_from_slice(pdu_id_count); + println!("{:?}", new_key); + } + + db.globals.bump_database_version(8)?; + + println!("Migration: 7 -> 8 finished"); + } + + panic!(); } let guard = db.read().await; diff --git a/src/database/abstraction/sqlite.rs b/src/database/abstraction/sqlite.rs index f420021..f37dd9e 100644 --- a/src/database/abstraction/sqlite.rs +++ b/src/database/abstraction/sqlite.rs @@ -223,10 +223,7 @@ impl Tree for SqliteTable { let statement = Box::leak(Box::new( guard - .prepare(&format!( - "SELECT key, value FROM {} ORDER BY key ASC", - &self.name - )) + .prepare(&format!("SELECT key, value FROM {} ORDER BY key ASC", &self.name)) .unwrap(), )); diff --git a/src/database/rooms.rs b/src/database/rooms.rs index 88878e9..a3a1c41 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -47,7 +47,7 @@ pub struct Rooms { pub(super) aliasid_alias: Arc, // AliasId = RoomId + Count pub(super) publicroomids: Arc, - pub(super) tokenids: Arc, // TokenId = RoomId + Token + PduId + pub(super) tokenids: Arc, // TokenId = ShortRoomId + Token + PduIdCount /// Participating servers in a room. pub(super) roomserverids: Arc, // RoomServerId = RoomId + ServerName @@ -71,14 +71,18 @@ pub struct Rooms { pub(super) shorteventid_shortstatehash: Arc, /// StateKey = EventType + StateKey, ShortStateKey = Count pub(super) statekey_shortstatekey: Arc, + + pub(super) shortroomid_roomid: Arc, + pub(super) roomid_shortroomid: Arc, + pub(super) shorteventid_eventid: Arc, - /// ShortEventId = Count pub(super) eventid_shorteventid: Arc, - /// ShortEventId = Count + pub(super) statehash_shortstatehash: Arc, /// ShortStateHash = Count - /// StateId = ShortStateHash + ShortStateKey + /// StateId = ShortStateHash pub(super) stateid_shorteventid: Arc, + pub(super) shortstatehash_statediff: Arc, // StateDiff = parent (or 0) + (shortstatekey+shorteventid++) + 0_u64 + (shortstatekey+shorteventid--) /// RoomId + EventId -> outlier PDU. /// Any pdu that has passed the steps 1-8 in the incoming event /federation/send/txn.