fix: stop /messages at to

Fixes #150
This commit is contained in:
timokoesters 2020-07-26 17:34:12 +02:00
parent 20ab19d828
commit b7df8fe83c
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
2 changed files with 46 additions and 38 deletions

View file

@ -2763,7 +2763,7 @@ pub fn get_context_route(
.filter_map(|r| r.ok()) // Remove buggy events .filter_map(|r| r.ok()) // Remove buggy events
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let start_token = events_before.last().map_or(Ok(None), |e| { let start_token = events_before.last().map_or(Ok(None), |(_, e)| {
Ok::<_, Error>(Some( Ok::<_, Error>(Some(
db.rooms db.rooms
.get_pdu_count(&e.event_id)? .get_pdu_count(&e.event_id)?
@ -2774,7 +2774,7 @@ pub fn get_context_route(
let events_before = events_before let events_before = events_before
.into_iter() .into_iter()
.map(|pdu| pdu.to_room_event()) .map(|(_, pdu)| pdu.to_room_event())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let events_after = db let events_after = db
@ -2789,18 +2789,19 @@ pub fn get_context_route(
.filter_map(|r| r.ok()) // Remove buggy events .filter_map(|r| r.ok()) // Remove buggy events
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let end_token = events_after.last().map_or(Ok(None), |e| { let end_token = if let Some(last_event) = events_after.last() {
Ok::<_, Error>(Some( Some(
db.rooms utils::u64_from_bytes(&last_event.0)
.get_pdu_count(&e.event_id)? .map_err(|_| Error::bad_database("Invalid pdu id in db."))?
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))?
.to_string(), .to_string(),
)) )
})?; } else {
None
};
let events_after = events_after let events_after = events_after
.into_iter() .into_iter()
.map(|pdu| pdu.to_room_event()) .map(|(_, pdu)| pdu.to_room_event())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(get_context::Response { Ok(get_context::Response {
@ -2839,32 +2840,38 @@ pub fn get_message_events_route(
.clone() .clone()
.parse() .parse()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from` value."))?; .map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from` value."))?;
let to = body.to.as_ref().map(|t| t.as_bytes());
// Use limit or else 10
let limit = body let limit = body
.limit .limit
.try_into() .try_into()
.map_or(Ok::<_, Error>(10_usize), |l: u32| Ok(l as usize))?; .map_or(Ok::<_, Error>(10_usize), |l: u32| Ok(l as usize))?;
match body.dir { match body.dir {
get_message_events::Direction::Forward => { get_message_events::Direction::Forward => {
let events_after = db let events_after = db
.rooms .rooms
.pdus_after(&user_id, &body.room_id, from) .pdus_after(&user_id, &body.room_id, from)
// Use limit or else 10
.take(limit) .take(limit)
.filter_map(|r| r.ok()) // Filter out buggy events .filter_map(|r| r.ok()) // Filter out buggy events
.take_while(|(k, _)| Some(&**k) != to) // Stop at `to`
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let end_token = events_after.last().map_or(Ok::<_, Error>(None), |e| { let end_token = if let Some(last_event) = events_after.last() {
Ok(Some( Some(
db.rooms utils::u64_from_bytes(&last_event.0)
.get_pdu_count(&e.event_id)? .map_err(|_| Error::bad_database("Invalid pdu id in db."))?
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))?
.to_string(), .to_string(),
)) )
})?; } else {
None
};
let events_after = events_after let events_after = events_after
.into_iter() .into_iter()
.map(|pdu| pdu.to_room_event()) .map(|(_, pdu)| pdu.to_room_event())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(get_message_events::Response { Ok(get_message_events::Response {
@ -2879,23 +2886,24 @@ pub fn get_message_events_route(
let events_before = db let events_before = db
.rooms .rooms
.pdus_until(&user_id, &body.room_id, from) .pdus_until(&user_id, &body.room_id, from)
// Use limit or else 10
.take(limit) .take(limit)
.filter_map(|r| r.ok()) // Filter out buggy events .filter_map(|r| r.ok()) // Filter out buggy events
.take_while(|(k, _)| Some(&**k) != to) // Stop at `to`
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let start_token = events_before.last().map_or(Ok::<_, Error>(None), |e| { let start_token = if let Some(last_event) = events_before.last() {
Ok(Some( Some(
db.rooms utils::u64_from_bytes(&last_event.0)
.get_pdu_count(&e.event_id)? .map_err(|_| Error::bad_database("Invalid pdu id in db."))?
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))?
.to_string(), .to_string(),
)) )
})?; } else {
None
};
let events_before = events_before let events_before = events_before
.into_iter() .into_iter()
.map(|pdu| pdu.to_room_event()) .map(|(_, pdu)| pdu.to_room_event())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(get_message_events::Response { Ok(get_message_events::Response {

View file

@ -654,14 +654,14 @@ impl Rooms {
})) }))
} }
/// Returns an iterator over all events in a room that happened before the event with id /// Returns an iterator over all events and their tokens in a room that happened before the
/// `until` in reverse-chronological order. /// event with id `until` in reverse-chronological order.
pub fn pdus_until( pub fn pdus_until(
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
until: u64, until: u64,
) -> impl Iterator<Item = Result<PduEvent>> { ) -> impl Iterator<Item = Result<(IVec, PduEvent)>> {
// Create the first part of the full pdu id // Create the first part of the full pdu id
let mut prefix = room_id.to_string().as_bytes().to_vec(); let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff); prefix.push(0xff);
@ -677,24 +677,24 @@ impl Rooms {
.rev() .rev()
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.take_while(move |(k, _)| k.starts_with(&prefix)) .take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(_, v)| { .map(move |(k, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v) let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id { if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id"); pdu.unsigned.remove("transaction_id");
} }
Ok(pdu) Ok((k, pdu))
}) })
} }
/// Returns an iterator over all events in a room that happened after the event with id /// Returns an iterator over all events and their token in a room that happened after the event
/// `from` in chronological order. /// with id `from` in chronological order.
pub fn pdus_after( pub fn pdus_after(
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
from: u64, from: u64,
) -> impl Iterator<Item = Result<PduEvent>> { ) -> impl Iterator<Item = Result<(IVec, PduEvent)>> {
// Create the first part of the full pdu id // Create the first part of the full pdu id
let mut prefix = room_id.to_string().as_bytes().to_vec(); let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff); prefix.push(0xff);
@ -709,13 +709,13 @@ impl Rooms {
.range(current..) .range(current..)
.filter_map(|r| r.ok()) .filter_map(|r| r.ok())
.take_while(move |(k, _)| k.starts_with(&prefix)) .take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(_, v)| { .map(move |(k, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v) let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?; .map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id { if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id"); pdu.unsigned.remove("transaction_id");
} }
Ok(pdu) Ok((k, pdu))
}) })
} }