message: keep MessageQueue sorted by origin_server_ts
This commit is contained in:
parent
178c6c06f8
commit
9386b500a8
1 changed files with 86 additions and 44 deletions
|
@ -1,43 +1,52 @@
|
|||
use std::collections::{vec_deque::IntoIter, VecDeque};
|
||||
use std::cmp::Ordering;
|
||||
use std::ops::Deref;
|
||||
use std::vec::IntoIter;
|
||||
|
||||
use crate::events::collections::all::RoomEvent;
|
||||
use crate::events::room::message::MessageEvent;
|
||||
use crate::events::EventJson;
|
||||
|
||||
use serde::{de, ser};
|
||||
|
||||
pub(crate) mod ser_deser {
|
||||
use super::*;
|
||||
|
||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<MessageQueue, D::Error>
|
||||
where
|
||||
D: de::Deserializer<'de>,
|
||||
{
|
||||
let messages: VecDeque<EventJson<RoomEvent>> = de::Deserialize::deserialize(deserializer)?;
|
||||
|
||||
// TODO this should probably bail out if deserialization fails not skip
|
||||
let msgs: VecDeque<RoomEvent> = messages
|
||||
.into_iter()
|
||||
.flat_map(|json| json.deserialize())
|
||||
.collect();
|
||||
|
||||
Ok(MessageQueue { msgs })
|
||||
}
|
||||
|
||||
pub fn serialize<S>(msgs: &MessageQueue, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: ser::Serializer,
|
||||
{
|
||||
use ser::Serialize;
|
||||
|
||||
msgs.msgs.serialize(serializer)
|
||||
}
|
||||
}
|
||||
use serde::{de, ser, Serialize};
|
||||
|
||||
/// A queue that holds at most 10 messages received from the server.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct MessageQueue {
|
||||
msgs: VecDeque<RoomEvent>,
|
||||
msgs: Vec<MessageWrapper>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct MessageWrapper(MessageEvent);
|
||||
|
||||
impl Deref for MessageWrapper {
|
||||
type Target = MessageEvent;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for MessageWrapper {
|
||||
fn eq(&self, other: &MessageWrapper) -> bool {
|
||||
self.0.event_id == other.0.event_id
|
||||
&& self.0.room_id == other.0.room_id
|
||||
&& self.0.origin_server_ts == other.0.origin_server_ts
|
||||
&& self.0.sender == other.0.sender
|
||||
&& self.0.content == other.0.content
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for MessageWrapper {}
|
||||
|
||||
impl PartialOrd for MessageWrapper {
|
||||
fn partial_cmp(&self, other: &MessageWrapper) -> Option<Ordering> {
|
||||
Some(self.0.origin_server_ts.cmp(&other.0.origin_server_ts))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for MessageWrapper {
|
||||
fn cmp(&self, other: &MessageWrapper) -> Ordering {
|
||||
self.partial_cmp(other).unwrap_or(Ordering::Equal)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for MessageQueue {
|
||||
|
@ -47,12 +56,7 @@ impl PartialEq for MessageQueue {
|
|||
.msgs
|
||||
.iter()
|
||||
.zip(other.msgs.iter())
|
||||
.all(|(a, b)| match (a, b) {
|
||||
(RoomEvent::RoomMessage(msg_a), RoomEvent::RoomMessage(msg_b)) => {
|
||||
msg_a.event_id == msg_b.event_id
|
||||
}
|
||||
_ => false,
|
||||
})
|
||||
.all(|(msg_a, msg_b)| msg_a.event_id == msg_b.event_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,28 +64,32 @@ impl MessageQueue {
|
|||
/// Create a new empty `MessageQueue`.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
msgs: VecDeque::with_capacity(20),
|
||||
msgs: Vec::with_capacity(20),
|
||||
}
|
||||
}
|
||||
|
||||
/// Appends a `MessageEvent` to the end of the `MessageQueue`.
|
||||
/// Inserts a `MessageEvent` into `MessageQueue`, sorted by by `origin_server_ts`.
|
||||
///
|
||||
/// Removes the oldest element in the queue if there are more than 10 elements.
|
||||
pub fn push(&mut self, msg: MessageEvent) -> bool {
|
||||
self.msgs.push_back(RoomEvent::RoomMessage(msg));
|
||||
let message = MessageWrapper(msg);
|
||||
match self.msgs.binary_search_by(|m| m.cmp(&message)) {
|
||||
Ok(pos) => self.msgs.insert(pos, message),
|
||||
Err(pos) => self.msgs.insert(pos, message),
|
||||
}
|
||||
if self.msgs.len() > 10 {
|
||||
self.msgs.pop_front();
|
||||
self.msgs.remove(0);
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &RoomEvent> {
|
||||
pub fn iter(&self) -> impl Iterator<Item = &MessageWrapper> {
|
||||
self.msgs.iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for MessageQueue {
|
||||
type Item = RoomEvent;
|
||||
type Item = MessageWrapper;
|
||||
type IntoIter = IntoIter<Self::Item>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
|
@ -89,6 +97,40 @@ impl IntoIterator for MessageQueue {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) mod ser_deser {
|
||||
use super::*;
|
||||
|
||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<MessageQueue, D::Error>
|
||||
where
|
||||
D: de::Deserializer<'de>,
|
||||
{
|
||||
let messages: Vec<EventJson<RoomEvent>> = de::Deserialize::deserialize(deserializer)?;
|
||||
|
||||
// TODO this should probably bail out if deserialization fails not skip
|
||||
let msgs: Vec<MessageWrapper> = messages
|
||||
.into_iter()
|
||||
.flat_map(|json| json.deserialize())
|
||||
.flat_map(|ev| {
|
||||
if let RoomEvent::RoomMessage(msg) = ev {
|
||||
Some(msg)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(MessageWrapper)
|
||||
.collect();
|
||||
|
||||
Ok(MessageQueue { msgs })
|
||||
}
|
||||
|
||||
pub fn serialize<S>(msgs: &MessageQueue, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: ser::Serializer,
|
||||
{
|
||||
msgs.msgs.serialize(serializer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
@ -98,7 +140,7 @@ mod test {
|
|||
|
||||
use crate::events::{collections::all::RoomEvent, EventJson};
|
||||
use crate::identifiers::{RoomId, UserId};
|
||||
use crate::{state::ClientState, Room};
|
||||
use crate::Room;
|
||||
|
||||
#[test]
|
||||
fn serialize() {
|
||||
|
|
Loading…
Reference in a new issue