Merge pull request 'fix: stop /messages at `to`' (#159) from fix-to into master
Reviewed-on: https://git.koesters.xyz/timo/conduit/pulls/159next
commit
c3d142ad28
|
@ -2763,7 +2763,7 @@ pub fn get_context_route(
|
|||
.filter_map(|r| r.ok()) // Remove buggy events
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let start_token = events_before.last().map_or(Ok(None), |e| {
|
||||
let start_token = events_before.last().map_or(Ok(None), |(_, e)| {
|
||||
Ok::<_, Error>(Some(
|
||||
db.rooms
|
||||
.get_pdu_count(&e.event_id)?
|
||||
|
@ -2774,7 +2774,7 @@ pub fn get_context_route(
|
|||
|
||||
let events_before = events_before
|
||||
.into_iter()
|
||||
.map(|pdu| pdu.to_room_event())
|
||||
.map(|(_, pdu)| pdu.to_room_event())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let events_after = db
|
||||
|
@ -2789,18 +2789,19 @@ pub fn get_context_route(
|
|||
.filter_map(|r| r.ok()) // Remove buggy events
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let end_token = events_after.last().map_or(Ok(None), |e| {
|
||||
Ok::<_, Error>(Some(
|
||||
db.rooms
|
||||
.get_pdu_count(&e.event_id)?
|
||||
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))?
|
||||
let end_token = if let Some(last_event) = events_after.last() {
|
||||
Some(
|
||||
utils::u64_from_bytes(&last_event.0)
|
||||
.map_err(|_| Error::bad_database("Invalid pdu id in db."))?
|
||||
.to_string(),
|
||||
))
|
||||
})?;
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let events_after = events_after
|
||||
.into_iter()
|
||||
.map(|pdu| pdu.to_room_event())
|
||||
.map(|(_, pdu)| pdu.to_room_event())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(get_context::Response {
|
||||
|
@ -2839,32 +2840,38 @@ pub fn get_message_events_route(
|
|||
.clone()
|
||||
.parse()
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from` value."))?;
|
||||
|
||||
let to = body.to.as_ref().map(|t| t.as_bytes());
|
||||
|
||||
// Use limit or else 10
|
||||
let limit = body
|
||||
.limit
|
||||
.try_into()
|
||||
.map_or(Ok::<_, Error>(10_usize), |l: u32| Ok(l as usize))?;
|
||||
|
||||
match body.dir {
|
||||
get_message_events::Direction::Forward => {
|
||||
let events_after = db
|
||||
.rooms
|
||||
.pdus_after(&user_id, &body.room_id, from)
|
||||
// Use limit or else 10
|
||||
.take(limit)
|
||||
.filter_map(|r| r.ok()) // Filter out buggy events
|
||||
.take_while(|(k, _)| Some(&**k) != to) // Stop at `to`
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let end_token = events_after.last().map_or(Ok::<_, Error>(None), |e| {
|
||||
Ok(Some(
|
||||
db.rooms
|
||||
.get_pdu_count(&e.event_id)?
|
||||
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))?
|
||||
let end_token = if let Some(last_event) = events_after.last() {
|
||||
Some(
|
||||
utils::u64_from_bytes(&last_event.0)
|
||||
.map_err(|_| Error::bad_database("Invalid pdu id in db."))?
|
||||
.to_string(),
|
||||
))
|
||||
})?;
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let events_after = events_after
|
||||
.into_iter()
|
||||
.map(|pdu| pdu.to_room_event())
|
||||
.map(|(_, pdu)| pdu.to_room_event())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(get_message_events::Response {
|
||||
|
@ -2879,23 +2886,24 @@ pub fn get_message_events_route(
|
|||
let events_before = db
|
||||
.rooms
|
||||
.pdus_until(&user_id, &body.room_id, from)
|
||||
// Use limit or else 10
|
||||
.take(limit)
|
||||
.filter_map(|r| r.ok()) // Filter out buggy events
|
||||
.take_while(|(k, _)| Some(&**k) != to) // Stop at `to`
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let start_token = events_before.last().map_or(Ok::<_, Error>(None), |e| {
|
||||
Ok(Some(
|
||||
db.rooms
|
||||
.get_pdu_count(&e.event_id)?
|
||||
.ok_or_else(|| Error::bad_database("Can't find count from event in db."))?
|
||||
let start_token = if let Some(last_event) = events_before.last() {
|
||||
Some(
|
||||
utils::u64_from_bytes(&last_event.0)
|
||||
.map_err(|_| Error::bad_database("Invalid pdu id in db."))?
|
||||
.to_string(),
|
||||
))
|
||||
})?;
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let events_before = events_before
|
||||
.into_iter()
|
||||
.map(|pdu| pdu.to_room_event())
|
||||
.map(|(_, pdu)| pdu.to_room_event())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(get_message_events::Response {
|
||||
|
|
|
@ -654,14 +654,14 @@ impl Rooms {
|
|||
}))
|
||||
}
|
||||
|
||||
/// Returns an iterator over all events in a room that happened before the event with id
|
||||
/// `until` in reverse-chronological order.
|
||||
/// Returns an iterator over all events and their tokens in a room that happened before the
|
||||
/// event with id `until` in reverse-chronological order.
|
||||
pub fn pdus_until(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
until: u64,
|
||||
) -> impl Iterator<Item = Result<PduEvent>> {
|
||||
) -> impl Iterator<Item = Result<(IVec, PduEvent)>> {
|
||||
// Create the first part of the full pdu id
|
||||
let mut prefix = room_id.to_string().as_bytes().to_vec();
|
||||
prefix.push(0xff);
|
||||
|
@ -677,24 +677,24 @@ impl Rooms {
|
|||
.rev()
|
||||
.filter_map(|r| r.ok())
|
||||
.take_while(move |(k, _)| k.starts_with(&prefix))
|
||||
.map(move |(_, v)| {
|
||||
.map(move |(k, v)| {
|
||||
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
|
||||
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
|
||||
if pdu.sender != user_id {
|
||||
pdu.unsigned.remove("transaction_id");
|
||||
}
|
||||
Ok(pdu)
|
||||
Ok((k, pdu))
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns an iterator over all events in a room that happened after the event with id
|
||||
/// `from` in chronological order.
|
||||
/// Returns an iterator over all events and their token in a room that happened after the event
|
||||
/// with id `from` in chronological order.
|
||||
pub fn pdus_after(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
from: u64,
|
||||
) -> impl Iterator<Item = Result<PduEvent>> {
|
||||
) -> impl Iterator<Item = Result<(IVec, PduEvent)>> {
|
||||
// Create the first part of the full pdu id
|
||||
let mut prefix = room_id.to_string().as_bytes().to_vec();
|
||||
prefix.push(0xff);
|
||||
|
@ -709,13 +709,13 @@ impl Rooms {
|
|||
.range(current..)
|
||||
.filter_map(|r| r.ok())
|
||||
.take_while(move |(k, _)| k.starts_with(&prefix))
|
||||
.map(move |(_, v)| {
|
||||
.map(move |(k, v)| {
|
||||
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
|
||||
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
|
||||
if pdu.sender != user_id {
|
||||
pdu.unsigned.remove("transaction_id");
|
||||
}
|
||||
Ok(pdu)
|
||||
Ok((k, pdu))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue