New PduEvent struct
parent
f9cfede2a8
commit
8b8381bcc0
|
@ -505,6 +505,7 @@ dependencies = [
|
||||||
"ruma-federation-api",
|
"ruma-federation-api",
|
||||||
"ruma-identifiers",
|
"ruma-identifiers",
|
||||||
"ruma-signatures",
|
"ruma-signatures",
|
||||||
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sled",
|
"sled",
|
||||||
]
|
]
|
||||||
|
|
|
@ -21,3 +21,4 @@ js_int = "0.1.4"
|
||||||
serde_json = "1.0.50"
|
serde_json = "1.0.50"
|
||||||
ruma-signatures = { git = "https://github.com/ruma/ruma-signatures.git" }
|
ruma-signatures = { git = "https://github.com/ruma/ruma-signatures.git" }
|
||||||
ruma-federation-api = "0.0.1"
|
ruma-federation-api = "0.0.1"
|
||||||
|
serde = "1.0.105"
|
||||||
|
|
55
src/data.rs
55
src/data.rs
|
@ -1,9 +1,12 @@
|
||||||
use crate::{utils, Database};
|
use crate::{utils, Database, PduEvent};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use ruma_events::collections::all::Event;
|
use ruma_events::{room::message::MessageEvent, EventType};
|
||||||
use ruma_federation_api::RoomV3Pdu;
|
use ruma_federation_api::RoomV3Pdu;
|
||||||
use ruma_identifiers::{EventId, RoomId, UserId};
|
use ruma_identifiers::{EventId, RoomId, UserId};
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
convert::{TryFrom, TryInto},
|
||||||
|
};
|
||||||
|
|
||||||
pub struct Data {
|
pub struct Data {
|
||||||
hostname: String,
|
hostname: String,
|
||||||
|
@ -145,7 +148,7 @@ impl Data {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a persisted data unit from this homeserver
|
/// Add a persisted data unit from this homeserver
|
||||||
pub fn pdu_append(&self, event_id: &EventId, room_id: &RoomId, event: Event) {
|
pub fn pdu_append_message(&self, event_id: &EventId, room_id: &RoomId, event: MessageEvent) {
|
||||||
// prev_events are the leaves of the current graph. This method removes all leaves from the
|
// prev_events are the leaves of the current graph. This method removes all leaves from the
|
||||||
// room and replaces them with our event
|
// room and replaces them with our event
|
||||||
let prev_events = self.pdu_leaves_replace(room_id, event_id);
|
let prev_events = self.pdu_leaves_replace(room_id, event_id);
|
||||||
|
@ -163,25 +166,25 @@ impl Data {
|
||||||
.unwrap_or(0_u64)
|
.unwrap_or(0_u64)
|
||||||
+ 1;
|
+ 1;
|
||||||
|
|
||||||
let mut pdu_value = serde_json::to_value(&event).expect("message event can be serialized");
|
let pdu = PduEvent {
|
||||||
let pdu = pdu_value.as_object_mut().unwrap();
|
event_id: event_id.clone(),
|
||||||
|
room_id: room_id.clone(),
|
||||||
pdu.insert(
|
sender: event.sender,
|
||||||
"prev_events".to_owned(),
|
origin: self.hostname.clone(),
|
||||||
prev_events
|
origin_server_ts: event.origin_server_ts,
|
||||||
.iter()
|
kind: EventType::RoomMessage,
|
||||||
.map(|id| id.to_string())
|
content: serde_json::to_value(event.content).unwrap(),
|
||||||
.collect::<Vec<_>>()
|
state_key: None,
|
||||||
.into(),
|
prev_events,
|
||||||
);
|
depth: depth.try_into().unwrap(),
|
||||||
pdu.insert("origin".to_owned(), self.hostname().into());
|
auth_events: Vec::new(),
|
||||||
pdu.insert("depth".to_owned(), depth.into());
|
redacts: None,
|
||||||
pdu.insert("auth_events".to_owned(), vec!["$auth_eventid"].into()); // TODO
|
unsigned: Default::default(),
|
||||||
pdu.insert(
|
hashes: ruma_federation_api::EventHash {
|
||||||
"hashes".to_owned(),
|
sha256: "aaa".to_owned(),
|
||||||
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".into(),
|
},
|
||||||
); // TODO
|
signatures: HashMap::new(),
|
||||||
pdu.insert("signatures".to_owned(), "signature".into()); // TODO
|
};
|
||||||
|
|
||||||
// The new value will need a new index. We store the last used index in 'n' + id
|
// The new value will need a new index. We store the last used index in 'n' + id
|
||||||
let mut count_key: Vec<u8> = vec![b'n'];
|
let mut count_key: Vec<u8> = vec![b'n'];
|
||||||
|
@ -205,7 +208,7 @@ impl Data {
|
||||||
|
|
||||||
self.db
|
self.db
|
||||||
.pduid_pdus
|
.pduid_pdus
|
||||||
.insert(&pdu_id, dbg!(&*serde_json::to_string(&pdu).unwrap()))
|
.insert(&pdu_id, &*serde_json::to_string(&pdu).unwrap())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
self.db
|
self.db
|
||||||
|
@ -215,7 +218,7 @@ impl Data {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a vector of all PDUs.
|
/// Returns a vector of all PDUs.
|
||||||
pub fn pdus_all(&self) -> Vec<RoomV3Pdu> {
|
pub fn pdus_all(&self) -> Vec<PduEvent> {
|
||||||
self.pdus_since(
|
self.pdus_since(
|
||||||
self.db
|
self.db
|
||||||
.eventid_pduid
|
.eventid_pduid
|
||||||
|
@ -229,7 +232,7 @@ impl Data {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a vector of all events that happened after the event with id `since`.
|
/// Returns a vector of all events that happened after the event with id `since`.
|
||||||
pub fn pdus_since(&self, since: String) -> Vec<RoomV3Pdu> {
|
pub fn pdus_since(&self, since: String) -> Vec<PduEvent> {
|
||||||
let mut pdus = Vec::new();
|
let mut pdus = Vec::new();
|
||||||
|
|
||||||
if let Some(room_id) = since.rsplitn(2, '#').nth(1) {
|
if let Some(room_id) = since.rsplitn(2, '#').nth(1) {
|
||||||
|
|
28
src/main.rs
28
src/main.rs
|
@ -1,13 +1,15 @@
|
||||||
#![feature(proc_macro_hygiene, decl_macro)]
|
#![feature(proc_macro_hygiene, decl_macro)]
|
||||||
mod data;
|
mod data;
|
||||||
mod database;
|
mod database;
|
||||||
|
mod pdu;
|
||||||
mod ruma_wrapper;
|
mod ruma_wrapper;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
pub use data::Data;
|
pub use data::Data;
|
||||||
pub use database::Database;
|
pub use database::Database;
|
||||||
|
pub use pdu::PduEvent;
|
||||||
|
|
||||||
use log::debug;
|
use log::{debug, error};
|
||||||
use rocket::{get, options, post, put, routes, State};
|
use rocket::{get, options, post, put, routes, State};
|
||||||
use ruma_client_api::{
|
use ruma_client_api::{
|
||||||
error::{Error, ErrorKind},
|
error::{Error, ErrorKind},
|
||||||
|
@ -17,7 +19,7 @@ use ruma_client_api::{
|
||||||
},
|
},
|
||||||
unversioned::get_supported_versions,
|
unversioned::get_supported_versions,
|
||||||
};
|
};
|
||||||
use ruma_events::{collections::all::Event, room::message::MessageEvent};
|
use ruma_events::{collections::all::RoomEvent, room::message::MessageEvent, EventResult};
|
||||||
use ruma_identifiers::{EventId, UserId};
|
use ruma_identifiers::{EventId, UserId};
|
||||||
use ruma_wrapper::{MatrixResult, Ruma};
|
use ruma_wrapper::{MatrixResult, Ruma};
|
||||||
use serde_json::map::Map;
|
use serde_json::map::Map;
|
||||||
|
@ -212,7 +214,7 @@ fn create_message_event_route(
|
||||||
body: Ruma<create_message_event::Request>,
|
body: Ruma<create_message_event::Request>,
|
||||||
) -> MatrixResult<create_message_event::Response> {
|
) -> MatrixResult<create_message_event::Response> {
|
||||||
// Construct event
|
// Construct event
|
||||||
let mut event = Event::RoomMessage(MessageEvent {
|
let mut event = RoomEvent::RoomMessage(MessageEvent {
|
||||||
content: body.data.clone().into_result().unwrap(),
|
content: body.data.clone().into_result().unwrap(),
|
||||||
event_id: EventId::try_from("$thiswillbefilledinlater").unwrap(),
|
event_id: EventId::try_from("$thiswillbefilledinlater").unwrap(),
|
||||||
origin_server_ts: utils::millis_since_unix_epoch(),
|
origin_server_ts: utils::millis_since_unix_epoch(),
|
||||||
|
@ -230,13 +232,13 @@ fn create_message_event_route(
|
||||||
.expect("ruma's reference hashes are correct");
|
.expect("ruma's reference hashes are correct");
|
||||||
|
|
||||||
// Insert event id
|
// Insert event id
|
||||||
if let Event::RoomMessage(message) = &mut event {
|
if let RoomEvent::RoomMessage(message) = &mut event {
|
||||||
message.event_id = event_id.clone();
|
message.event_id = event_id.clone();
|
||||||
|
data.pdu_append_message(&event_id, &body.room_id, message.clone());
|
||||||
|
} else {
|
||||||
|
error!("only roommessages are handled currently");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add PDU to the graph
|
|
||||||
data.pdu_append(&event_id, &body.room_id, event);
|
|
||||||
|
|
||||||
MatrixResult(Ok(create_message_event::Response { event_id }))
|
MatrixResult(Ok(create_message_event::Response { event_id }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,8 +247,15 @@ fn sync_route(
|
||||||
data: State<Data>,
|
data: State<Data>,
|
||||||
body: Ruma<sync_events::Request>,
|
body: Ruma<sync_events::Request>,
|
||||||
) -> MatrixResult<sync_events::Response> {
|
) -> MatrixResult<sync_events::Response> {
|
||||||
let pdus = data.pdus_all();
|
|
||||||
let mut joined_rooms = HashMap::new();
|
let mut joined_rooms = HashMap::new();
|
||||||
|
{
|
||||||
|
let pdus = data.pdus_all();
|
||||||
|
let mut room_events = Vec::new();
|
||||||
|
|
||||||
|
for pdu in pdus {
|
||||||
|
room_events.push(pdu.to_room_event());
|
||||||
|
}
|
||||||
|
|
||||||
joined_rooms.insert(
|
joined_rooms.insert(
|
||||||
"!roomid:localhost".try_into().unwrap(),
|
"!roomid:localhost".try_into().unwrap(),
|
||||||
sync_events::JoinedRoom {
|
sync_events::JoinedRoom {
|
||||||
|
@ -263,12 +272,13 @@ fn sync_route(
|
||||||
timeline: sync_events::Timeline {
|
timeline: sync_events::Timeline {
|
||||||
limited: None,
|
limited: None,
|
||||||
prev_batch: None,
|
prev_batch: None,
|
||||||
events: todo!(),
|
events: room_events,
|
||||||
},
|
},
|
||||||
state: sync_events::State { events: Vec::new() },
|
state: sync_events::State { events: Vec::new() },
|
||||||
ephemeral: sync_events::Ephemeral { events: Vec::new() },
|
ephemeral: sync_events::Ephemeral { events: Vec::new() },
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
MatrixResult(Ok(sync_events::Response {
|
MatrixResult(Ok(sync_events::Response {
|
||||||
next_batch: String::new(),
|
next_batch: String::new(),
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
use js_int::UInt;
|
||||||
|
use ruma_events::{collections::all::RoomEvent, EventResult, EventType};
|
||||||
|
use ruma_federation_api::EventHash;
|
||||||
|
use ruma_identifiers::{EventId, RoomId, UserId};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
pub struct PduEvent {
|
||||||
|
pub event_id: EventId,
|
||||||
|
pub room_id: RoomId,
|
||||||
|
pub sender: UserId,
|
||||||
|
pub origin: String,
|
||||||
|
pub origin_server_ts: UInt,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub kind: EventType,
|
||||||
|
pub content: serde_json::Value,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub state_key: Option<String>,
|
||||||
|
pub prev_events: Vec<EventId>,
|
||||||
|
pub depth: UInt,
|
||||||
|
pub auth_events: Vec<EventId>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub redacts: Option<EventId>,
|
||||||
|
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
|
||||||
|
pub unsigned: serde_json::Map<String, serde_json::Value>,
|
||||||
|
pub hashes: EventHash,
|
||||||
|
pub signatures: HashMap<String, HashMap<String, String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PduEvent {
|
||||||
|
pub fn to_room_event(&self) -> RoomEvent {
|
||||||
|
// Can only fail in rare circumstances that won't ever happen here, see
|
||||||
|
// https://docs.rs/serde_json/1.0.50/serde_json/fn.to_string.html
|
||||||
|
let json = serde_json::to_string(&self).unwrap();
|
||||||
|
// EventResult's deserialize implementation always returns `Ok(...)`
|
||||||
|
serde_json::from_str::<EventResult<RoomEvent>>(&json)
|
||||||
|
.unwrap()
|
||||||
|
.into_result()
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue