Merge pull request 'fix: random timeline reloads' (#227) from fix-random-timeline-reloads into master
Reviewed-on: https://git.koesters.xyz/timo/conduit/pulls/227
This commit is contained in:
		
						commit
						935546b25b
					
				
					 4 changed files with 53 additions and 24 deletions
				
			
		|  | @ -22,7 +22,10 @@ use ruma::{ | ||||||
|     EventId, Raw, RoomId, RoomVersionId, ServerName, UserId, |     EventId, Raw, RoomId, RoomVersionId, ServerName, UserId, | ||||||
| }; | }; | ||||||
| use state_res::StateEvent; | use state_res::StateEvent; | ||||||
| use std::{collections::BTreeMap, collections::HashMap, collections::HashSet, convert::TryFrom, iter, sync::Arc}; | use std::{ | ||||||
|  |     collections::BTreeMap, collections::HashMap, collections::HashSet, convert::TryFrom, iter, | ||||||
|  |     sync::Arc, | ||||||
|  | }; | ||||||
| 
 | 
 | ||||||
| #[cfg(feature = "conduit_bin")] | #[cfg(feature = "conduit_bin")] | ||||||
| use rocket::{get, post}; | use rocket::{get, post}; | ||||||
|  | @ -642,9 +645,15 @@ async fn join_room_by_id_helper( | ||||||
|                 .expect("Found event_id in sorted events that is not in resolved state"); |                 .expect("Found event_id in sorted events that is not in resolved state"); | ||||||
| 
 | 
 | ||||||
|             // We do not rebuild the PDU in this case only insert to DB
 |             // We do not rebuild the PDU in this case only insert to DB
 | ||||||
|             let pdu_id = db.rooms.append_pdu( |             let count = db.globals.next_count()?; | ||||||
|  |             let mut pdu_id = room_id.as_bytes().to_vec(); | ||||||
|  |             pdu_id.push(0xff); | ||||||
|  |             pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||||
|  |             db.rooms.append_pdu( | ||||||
|                 &PduEvent::from(&**pdu), |                 &PduEvent::from(&**pdu), | ||||||
|                 &serde_json::to_value(&**pdu).expect("PDU is valid value"), |                 &serde_json::to_value(&**pdu).expect("PDU is valid value"), | ||||||
|  |                 count, | ||||||
|  |                 pdu_id.clone().into(), | ||||||
|                 &db.globals, |                 &db.globals, | ||||||
|                 &db.account_data, |                 &db.account_data, | ||||||
|                 &db.sending, |                 &db.sending, | ||||||
|  |  | ||||||
|  | @ -110,11 +110,11 @@ pub async fn sync_events_route( | ||||||
|         // since and the current room state, meaning there should be no updates.
 |         // since and the current room state, meaning there should be no updates.
 | ||||||
|         // The inner Option is None when there is an event, but there is no state hash associated
 |         // The inner Option is None when there is an event, but there is no state hash associated
 | ||||||
|         // with it. This can happen for the RoomCreate event, so all updates should arrive.
 |         // with it. This can happen for the RoomCreate event, so all updates should arrive.
 | ||||||
|         let since_state_hash = db |         let first_pdu_after_since = db.rooms.pdus_after(sender_id, &room_id, since).next(); | ||||||
|             .rooms | 
 | ||||||
|             .pdus_after(sender_id, &room_id, since) // - 1 So we can get the event at since
 |         let since_state_hash = first_pdu_after_since | ||||||
|             .next() |             .as_ref() | ||||||
|             .map(|pdu| db.rooms.pdu_state_hash(&pdu.ok()?.0).ok()?); |             .map(|pdu| db.rooms.pdu_state_hash(&pdu.as_ref().ok()?.0).ok()?); | ||||||
| 
 | 
 | ||||||
|         let since_members = since_state_hash.as_ref().map(|state_hash| { |         let since_members = since_state_hash.as_ref().map(|state_hash| { | ||||||
|             state_hash.as_ref().and_then(|state_hash| { |             state_hash.as_ref().and_then(|state_hash| { | ||||||
|  |  | ||||||
|  | @ -438,25 +438,18 @@ impl Rooms { | ||||||
|         &self, |         &self, | ||||||
|         pdu: &PduEvent, |         pdu: &PduEvent, | ||||||
|         pdu_json: &serde_json::Value, |         pdu_json: &serde_json::Value, | ||||||
|  |         count: u64, | ||||||
|  |         pdu_id: IVec, | ||||||
|         globals: &super::globals::Globals, |         globals: &super::globals::Globals, | ||||||
|         account_data: &super::account_data::AccountData, |         account_data: &super::account_data::AccountData, | ||||||
|         sending: &super::sending::Sending, |         sending: &super::sending::Sending, | ||||||
|     ) -> Result<Vec<u8>> { |     ) -> Result<()> { | ||||||
|         self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; |         self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; | ||||||
| 
 | 
 | ||||||
|         // Increment the last index and use that
 |  | ||||||
|         // This is also the next_batch/since value
 |  | ||||||
|         let index = globals.next_count()?; |  | ||||||
| 
 |  | ||||||
|         // Mark as read first so the sending client doesn't get a notification even if appending
 |         // Mark as read first so the sending client doesn't get a notification even if appending
 | ||||||
|         // fails
 |         // fails
 | ||||||
|         self.edus |         self.edus | ||||||
|             .private_read_set(&pdu.room_id, &pdu.sender, index, &globals)?; |             .private_read_set(&pdu.room_id, &pdu.sender, count, &globals)?; | ||||||
| 
 |  | ||||||
|         let room_id = pdu.room_id.clone(); |  | ||||||
|         let mut pdu_id = room_id.as_bytes().to_vec(); |  | ||||||
|         pdu_id.push(0xff); |  | ||||||
|         pdu_id.extend_from_slice(&index.to_be_bytes()); |  | ||||||
| 
 | 
 | ||||||
|         self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?; |         self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?; | ||||||
| 
 | 
 | ||||||
|  | @ -537,7 +530,7 @@ impl Rooms { | ||||||
|                                 }, |                                 }, | ||||||
|                                 &UserId::try_from(format!("@conduit:{}", globals.server_name())) |                                 &UserId::try_from(format!("@conduit:{}", globals.server_name())) | ||||||
|                                     .expect("@conduit:server_name is valid"), |                                     .expect("@conduit:server_name is valid"), | ||||||
|                                 &room_id, |                                 &pdu.room_id, | ||||||
|                                 &globals, |                                 &globals, | ||||||
|                                 &sending, |                                 &sending, | ||||||
|                                 &account_data, |                                 &account_data, | ||||||
|  | @ -549,7 +542,7 @@ impl Rooms { | ||||||
|             _ => {} |             _ => {} | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         Ok(pdu_id) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// Generates a new StateHash and associates it with the incoming event.
 |     /// Generates a new StateHash and associates it with the incoming event.
 | ||||||
|  | @ -834,10 +827,27 @@ impl Rooms { | ||||||
|             .expect("json is object") |             .expect("json is object") | ||||||
|             .insert("event_id".to_owned(), pdu.event_id.to_string().into()); |             .insert("event_id".to_owned(), pdu.event_id.to_string().into()); | ||||||
| 
 | 
 | ||||||
|         let pdu_id = self.append_pdu(&pdu, &pdu_json, globals, account_data, sending)?; |         // Increment the last index and use that
 | ||||||
|  |         // This is also the next_batch/since value
 | ||||||
|  |         let count = globals.next_count()?; | ||||||
|  |         let mut pdu_id = room_id.as_bytes().to_vec(); | ||||||
|  |         pdu_id.push(0xff); | ||||||
|  |         pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||||
| 
 | 
 | ||||||
|  |         // We append to state before appending the pdu, so we don't have a moment in time with the
 | ||||||
|  |         // pdu without it's state. This is okay because append_pdu can't fail.
 | ||||||
|         self.append_to_state(&pdu_id, &pdu)?; |         self.append_to_state(&pdu_id, &pdu)?; | ||||||
| 
 | 
 | ||||||
|  |         self.append_pdu( | ||||||
|  |             &pdu, | ||||||
|  |             &pdu_json, | ||||||
|  |             count, | ||||||
|  |             pdu_id.clone().into(), | ||||||
|  |             globals, | ||||||
|  |             account_data, | ||||||
|  |             sending, | ||||||
|  |         )?; | ||||||
|  | 
 | ||||||
|         for server in self |         for server in self | ||||||
|             .room_servers(room_id) |             .room_servers(room_id) | ||||||
|             .filter_map(|r| r.ok()) |             .filter_map(|r| r.ok()) | ||||||
|  |  | ||||||
|  | @ -389,9 +389,19 @@ pub fn send_transaction_message_route<'a>( | ||||||
|         let pdu = serde_json::from_value::<PduEvent>(value.clone()) |         let pdu = serde_json::from_value::<PduEvent>(value.clone()) | ||||||
|             .expect("all ruma pdus are conduit pdus"); |             .expect("all ruma pdus are conduit pdus"); | ||||||
|         if db.rooms.exists(&pdu.room_id)? { |         if db.rooms.exists(&pdu.room_id)? { | ||||||
|             let pdu_id = |             let count = db.globals.next_count()?; | ||||||
|                 db.rooms |             let mut pdu_id = pdu.room_id.as_bytes().to_vec(); | ||||||
|                     .append_pdu(&pdu, &value, &db.globals, &db.account_data, &db.sending)?; |             pdu_id.push(0xff); | ||||||
|  |             pdu_id.extend_from_slice(&count.to_be_bytes()); | ||||||
|  |             db.rooms.append_pdu( | ||||||
|  |                 &pdu, | ||||||
|  |                 &value, | ||||||
|  |                 count, | ||||||
|  |                 pdu_id.clone().into(), | ||||||
|  |                 &db.globals, | ||||||
|  |                 &db.account_data, | ||||||
|  |                 &db.sending, | ||||||
|  |             )?; | ||||||
|             db.rooms.append_to_state(&pdu_id, &pdu)?; |             db.rooms.append_to_state(&pdu_id, &pdu)?; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue