fix: stuck messages

Conduit did not send the event in /sync because of a race condition.
There is a brief moment in time where Conduit accepted the event, but
did not store it yet. So when the client /syncs it updates the since
token without sending the event. I hope I fixed it by significantly
shortening the race-condition period.
This commit is contained in:
Timo Kösters 2021-07-01 11:06:05 +02:00
parent 8c44312485
commit 1c25492a7e
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
4 changed files with 19 additions and 25 deletions

View file

@ -619,16 +619,9 @@ async fn join_room_by_id_helper(
// pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = db.rooms.append_to_state(&pdu, &db.globals)?;
let count = db.globals.next_count()?;
let mut pdu_id = room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_pdu(
&pdu,
utils::to_canonical_object(&pdu).expect("Pdu is valid canonical object"),
count,
&pdu_id,
&[pdu.event_id.clone()],
db,
)?;

View file

@ -666,11 +666,10 @@ impl Rooms {
&self,
pdu: &PduEvent,
mut pdu_json: CanonicalJsonObject,
count: u64,
pdu_id: &[u8],
leaves: &[EventId],
db: &Database,
) -> Result<()> {
) -> Result<Vec<u8>> {
// returns pdu id
// Make unsigned fields correct. This is not properly documented in the spec, but state
// events need to have previous content in the unsigned field, so clients can easily
// interpret things like membership changes
@ -708,20 +707,30 @@ impl Rooms {
self.replace_pdu_leaves(&pdu.room_id, leaves)?;
let count1 = db.globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending
// fails
self.edus
.private_read_set(&pdu.room_id, &pdu.sender, count, &db.globals)?;
.private_read_set(&pdu.room_id, &pdu.sender, count1, &db.globals)?;
self.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
let count2 = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count2.to_be_bytes());
// There's a brief moment of time here where the count is updated but the pdu does not
// exist. This could theoretically lead to dropped pdus, but it's extremely rare
self.pduid_pdu.insert(
pdu_id,
&pdu_id,
&serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"),
)?;
// This also replaces the eventid of any outliers with the correct
// pduid, removing the place holder.
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?;
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &pdu_id)?;
// See if the event matches any known pushers
for user in db
@ -909,7 +918,7 @@ impl Rooms {
_ => {}
}
Ok(())
Ok(pdu_id)
}
pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
@ -1354,11 +1363,9 @@ impl Rooms {
// pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = self.append_to_state(&pdu, &db.globals)?;
self.append_pdu(
let pdu_id = self.append_pdu(
&pdu,
pdu_json,
count,
&pdu_id,
// Since this PDU references all pdu_leaves we can update the leaves
// of the room
&[pdu.event_id.clone()],

View file

@ -320,6 +320,7 @@ where
}),
Err(e) => {
warn!("{:?}", e);
// Bad Json
Failure((Status::new(583), ()))
}
}

View file

@ -1579,21 +1579,14 @@ pub(crate) fn append_incoming_pdu(
new_room_leaves: HashSet<EventId>,
state: &StateMap<Arc<PduEvent>>,
) -> Result<Vec<u8>> {
let count = db.globals.next_count()?;
let mut pdu_id = pdu.room_id.as_bytes().to_vec();
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
db.rooms
.set_event_state(&pdu.event_id, state, &db.globals)?;
db.rooms.append_pdu(
let pdu_id = db.rooms.append_pdu(
pdu,
pdu_json,
count,
&pdu_id,
&new_room_leaves.into_iter().collect::<Vec<_>>(),
&db,
)?;