diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs index 9c1e7c6..06e5adf 100644 --- a/src/client_server/membership.rs +++ b/src/client_server/membership.rs @@ -22,7 +22,10 @@ use ruma::{ EventId, Raw, RoomId, RoomVersionId, ServerName, UserId, }; use state_res::StateEvent; -use std::{collections::BTreeMap, collections::HashMap, collections::HashSet, convert::TryFrom, iter, sync::Arc}; +use std::{ + collections::BTreeMap, collections::HashMap, collections::HashSet, convert::TryFrom, iter, + sync::Arc, +}; #[cfg(feature = "conduit_bin")] use rocket::{get, post}; @@ -642,9 +645,15 @@ async fn join_room_by_id_helper( .expect("Found event_id in sorted events that is not in resolved state"); // We do not rebuild the PDU in this case only insert to DB - let pdu_id = db.rooms.append_pdu( + let count = db.globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_pdu( &PduEvent::from(&**pdu), &serde_json::to_value(&**pdu).expect("PDU is valid value"), + count, + pdu_id.clone().into(), &db.globals, &db.account_data, &db.sending, diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs index 6f41160..688d304 100644 --- a/src/client_server/sync.rs +++ b/src/client_server/sync.rs @@ -110,11 +110,11 @@ pub async fn sync_events_route( // since and the current room state, meaning there should be no updates. // The inner Option is None when there is an event, but there is no state hash associated // with it. This can happen for the RoomCreate event, so all updates should arrive. - let since_state_hash = db - .rooms - .pdus_after(sender_id, &room_id, since) // - 1 So we can get the event at since - .next() - .map(|pdu| db.rooms.pdu_state_hash(&pdu.ok()?.0).ok()?); + let first_pdu_after_since = db.rooms.pdus_after(sender_id, &room_id, since).next(); + + let since_state_hash = first_pdu_after_since + .as_ref() + .map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?); let since_members = since_state_hash.as_ref().map(|state_hash| { state_hash.as_ref().and_then(|state_hash| { diff --git a/src/database/rooms.rs b/src/database/rooms.rs index db473ff..35c3eac 100644 --- a/src/database/rooms.rs +++ b/src/database/rooms.rs @@ -438,25 +438,18 @@ impl Rooms { &self, pdu: &PduEvent, pdu_json: &serde_json::Value, + count: u64, + pdu_id: IVec, globals: &super::globals::Globals, account_data: &super::account_data::AccountData, sending: &super::sending::Sending, - ) -> Result> { + ) -> Result<()> { self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; - // Increment the last index and use that - // This is also the next_batch/since value - let index = globals.next_count()?; - // Mark as read first so the sending client doesn't get a notification even if appending // fails self.edus - .private_read_set(&pdu.room_id, &pdu.sender, index, &globals)?; - - let room_id = pdu.room_id.clone(); - let mut pdu_id = room_id.as_bytes().to_vec(); - pdu_id.push(0xff); - pdu_id.extend_from_slice(&index.to_be_bytes()); + .private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?; self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?; @@ -537,7 +530,7 @@ impl Rooms { }, &UserId::try_from(format!("@conduit:{}", globals.server_name())) .expect("@conduit:server_name is valid"), - &room_id, + &pdu.room_id, &globals, &sending, &account_data, @@ -549,7 +542,7 @@ impl Rooms { _ => {} } - Ok(pdu_id) + Ok(()) } /// Generates a new StateHash and associates it with the incoming event. @@ -834,10 +827,27 @@ impl Rooms { .expect("json is object") .insert("event_id".to_owned(), pdu.event_id.to_string().into()); - let pdu_id = self.append_pdu(&pdu, &pdu_json, globals, account_data, sending)?; + // Increment the last index and use that + // This is also the next_batch/since value + let count = globals.next_count()?; + let mut pdu_id = room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + // We append to state before appending the pdu, so we don't have a moment in time with the + // pdu without it's state. This is okay because append_pdu can't fail. self.append_to_state(&pdu_id, &pdu)?; + self.append_pdu( + &pdu, + &pdu_json, + count, + pdu_id.clone().into(), + globals, + account_data, + sending, + )?; + for server in self .room_servers(room_id) .filter_map(|r| r.ok()) diff --git a/src/server_server.rs b/src/server_server.rs index b8b575e..3fefbd5 100644 --- a/src/server_server.rs +++ b/src/server_server.rs @@ -389,9 +389,19 @@ pub fn send_transaction_message_route<'a>( let pdu = serde_json::from_value::(value.clone()) .expect("all ruma pdus are conduit pdus"); if db.rooms.exists(&pdu.room_id)? { - let pdu_id = - db.rooms - .append_pdu(&pdu, &value, &db.globals, &db.account_data, &db.sending)?; + let count = db.globals.next_count()?; + let mut pdu_id = pdu.room_id.as_bytes().to_vec(); + pdu_id.push(0xff); + pdu_id.extend_from_slice(&count.to_be_bytes()); + db.rooms.append_pdu( + &pdu, + &value, + count, + pdu_id.clone().into(), + &db.globals, + &db.account_data, + &db.sending, + )?; db.rooms.append_to_state(&pdu_id, &pdu)?; } }