Merge branch 'receipt-and-txnid' into 'master'
/receipt and transaction ids for federated requests Closes #11 See merge request famedly/conduit!27
This commit is contained in:
		
						commit
						566b8ebabb
					
				
					 3 changed files with 63 additions and 9 deletions
				
			
		|  | @ -2,7 +2,8 @@ use super::State; | ||||||
| use crate::{ConduitResult, Database, Error, Ruma}; | use crate::{ConduitResult, Database, Error, Ruma}; | ||||||
| use ruma::{ | use ruma::{ | ||||||
|     api::client::{ |     api::client::{ | ||||||
|         error::ErrorKind, r0::capabilities::get_capabilities, r0::read_marker::set_read_marker, |         error::ErrorKind, | ||||||
|  |         r0::{read_marker::set_read_marker, receipt::create_receipt}, | ||||||
|     }, |     }, | ||||||
|     events::{AnyEphemeralRoomEvent, AnyEvent, EventType}, |     events::{AnyEphemeralRoomEvent, AnyEvent, EventType}, | ||||||
| }; | }; | ||||||
|  | @ -83,13 +84,52 @@ pub async fn set_read_marker_route( | ||||||
|     feature = "conduit_bin", |     feature = "conduit_bin", | ||||||
|     post("/_matrix/client/r0/rooms/<_>/receipt/<_>/<_>", data = "<body>") |     post("/_matrix/client/r0/rooms/<_>/receipt/<_>/<_>", data = "<body>") | ||||||
| )] | )] | ||||||
| pub async fn set_receipt_route( | pub async fn create_receipt_route( | ||||||
|     db: State<'_, Database>, |     db: State<'_, Database>, | ||||||
|     body: Ruma<get_capabilities::Request>, |     body: Ruma<create_receipt::Request<'_>>, | ||||||
| ) -> ConduitResult<set_read_marker::Response> { | ) -> ConduitResult<create_receipt::Response> { | ||||||
|     let _sender_user = body.sender_user.as_ref().expect("user is authenticated"); |     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||||
|  | 
 | ||||||
|  |     db.rooms.edus.private_read_set( | ||||||
|  |         &body.room_id, | ||||||
|  |         &sender_user, | ||||||
|  |         db.rooms | ||||||
|  |             .get_pdu_count(&body.event_id)? | ||||||
|  |             .ok_or(Error::BadRequest( | ||||||
|  |                 ErrorKind::InvalidParam, | ||||||
|  |                 "Event does not exist.", | ||||||
|  |             ))?, | ||||||
|  |         &db.globals, | ||||||
|  |     )?; | ||||||
|  | 
 | ||||||
|  |     let mut user_receipts = BTreeMap::new(); | ||||||
|  |     user_receipts.insert( | ||||||
|  |         sender_user.clone(), | ||||||
|  |         ruma::events::receipt::Receipt { | ||||||
|  |             ts: Some(SystemTime::now()), | ||||||
|  |         }, | ||||||
|  |     ); | ||||||
|  |     let mut receipt_content = BTreeMap::new(); | ||||||
|  |     receipt_content.insert( | ||||||
|  |         body.event_id.to_owned(), | ||||||
|  |         ruma::events::receipt::Receipts { | ||||||
|  |             read: Some(user_receipts), | ||||||
|  |         }, | ||||||
|  |     ); | ||||||
|  | 
 | ||||||
|  |     db.rooms.edus.readreceipt_update( | ||||||
|  |         &sender_user, | ||||||
|  |         &body.room_id, | ||||||
|  |         AnyEvent::Ephemeral(AnyEphemeralRoomEvent::Receipt( | ||||||
|  |             ruma::events::receipt::ReceiptEvent { | ||||||
|  |                 content: ruma::events::receipt::ReceiptEventContent(receipt_content), | ||||||
|  |                 room_id: body.room_id.clone(), | ||||||
|  |             }, | ||||||
|  |         )), | ||||||
|  |         &db.globals, | ||||||
|  |     )?; | ||||||
| 
 | 
 | ||||||
|     db.flush().await?; |     db.flush().await?; | ||||||
| 
 | 
 | ||||||
|     Ok(set_read_marker::Response.into()) |     Ok(create_receipt::Response.into()) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -9,6 +9,7 @@ use std::{ | ||||||
| use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; | use crate::{appservice_server, server_server, utils, Error, PduEvent, Result}; | ||||||
| use federation::transactions::send_transaction_message; | use federation::transactions::send_transaction_message; | ||||||
| use log::info; | use log::info; | ||||||
|  | use ring::digest; | ||||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||||
| use ruma::{ | use ruma::{ | ||||||
|     api::{appservice, federation, OutgoingRequest}, |     api::{appservice, federation, OutgoingRequest}, | ||||||
|  | @ -229,6 +230,13 @@ impl Sending { | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     fn calculate_hash(keys: &[IVec]) -> Vec<u8> { | ||||||
|  |         // We only hash the pdu's event ids, not the whole pdu
 | ||||||
|  |         let bytes = keys.join(&0xff); | ||||||
|  |         let hash = digest::digest(&digest::SHA256, &bytes); | ||||||
|  |         hash.as_ref().to_owned() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     async fn handle_event( |     async fn handle_event( | ||||||
|         server: Box<ServerName>, |         server: Box<ServerName>, | ||||||
|         is_appservice: bool, |         is_appservice: bool, | ||||||
|  | @ -266,7 +274,10 @@ impl Sending { | ||||||
|                     .unwrap(), // TODO: handle error
 |                     .unwrap(), // TODO: handle error
 | ||||||
|                 appservice::event::push_events::v1::Request { |                 appservice::event::push_events::v1::Request { | ||||||
|                     events: &pdu_jsons, |                     events: &pdu_jsons, | ||||||
|                     txn_id: &utils::random_string(16), |                     txn_id: &base64::encode_config( | ||||||
|  |                         Self::calculate_hash(&pdu_ids), | ||||||
|  |                         base64::URL_SAFE_NO_PAD, | ||||||
|  |                     ), | ||||||
|                 }, |                 }, | ||||||
|             ) |             ) | ||||||
|             .await |             .await | ||||||
|  | @ -309,7 +320,10 @@ impl Sending { | ||||||
|                     pdus: &pdu_jsons, |                     pdus: &pdu_jsons, | ||||||
|                     edus: &[], |                     edus: &[], | ||||||
|                     origin_server_ts: SystemTime::now(), |                     origin_server_ts: SystemTime::now(), | ||||||
|                     transaction_id: &utils::random_string(16), |                     transaction_id: &base64::encode_config( | ||||||
|  |                         Self::calculate_hash(&pdu_ids), | ||||||
|  |                         base64::URL_SAFE_NO_PAD, | ||||||
|  |                     ), | ||||||
|                 }, |                 }, | ||||||
|             ) |             ) | ||||||
|             .await |             .await | ||||||
|  |  | ||||||
|  | @ -90,7 +90,7 @@ fn setup_rocket() -> rocket::Rocket { | ||||||
|                 client_server::get_backup_key_sessions_route, |                 client_server::get_backup_key_sessions_route, | ||||||
|                 client_server::get_backup_keys_route, |                 client_server::get_backup_keys_route, | ||||||
|                 client_server::set_read_marker_route, |                 client_server::set_read_marker_route, | ||||||
|                 client_server::set_receipt_route, |                 client_server::create_receipt_route, | ||||||
|                 client_server::create_typing_event_route, |                 client_server::create_typing_event_route, | ||||||
|                 client_server::create_room_route, |                 client_server::create_room_route, | ||||||
|                 client_server::redact_event_route, |                 client_server::redact_event_route, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue