Merge branch 'sending' into 'master'
Sending improvements See merge request famedly/conduit!2
This commit is contained in:
		
						commit
						998f9e2109
					
				
					 19 changed files with 371 additions and 112 deletions
				
			
		|  | @ -241,6 +241,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -264,6 +265,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -300,6 +302,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -319,6 +322,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -340,6 +344,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -359,6 +364,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -380,6 +386,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -398,6 +405,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -422,6 +430,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -447,6 +456,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|         db.rooms.build_and_append_pdu( | ||||
|  | @ -468,6 +478,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -494,6 +505,7 @@ pub async fn register_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  | @ -666,6 +678,7 @@ pub async fn deactivate_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  |  | |||
|  | @ -103,6 +103,7 @@ pub async fn leave_room_route( | |||
|                 ErrorKind::BadState, | ||||
|                 "Cannot leave a room you are not a member of.", | ||||
|             ))? | ||||
|             .1 | ||||
|             .content, | ||||
|     ) | ||||
|     .expect("from_value::<Raw<..>> can never fail") | ||||
|  | @ -123,6 +124,7 @@ pub async fn leave_room_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -161,6 +163,7 @@ pub async fn invite_user_route( | |||
|             &body.room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -193,6 +196,7 @@ pub async fn kick_user_route( | |||
|                 ErrorKind::BadState, | ||||
|                 "Cannot kick member that's not in the room.", | ||||
|             ))? | ||||
|             .1 | ||||
|             .content, | ||||
|     ) | ||||
|     .expect("Raw::from_value always works") | ||||
|  | @ -214,6 +218,7 @@ pub async fn kick_user_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -249,7 +254,7 @@ pub async fn ban_user_route( | |||
|                 is_direct: None, | ||||
|                 third_party_invite: None, | ||||
|             }), | ||||
|             |event| { | ||||
|             |(_, event)| { | ||||
|                 let mut event = | ||||
|                     serde_json::from_value::<Raw<member::MemberEventContent>>(event.content) | ||||
|                         .expect("Raw::from_value always works") | ||||
|  | @ -272,6 +277,7 @@ pub async fn ban_user_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -301,6 +307,7 @@ pub async fn unban_user_route( | |||
|                 ErrorKind::BadState, | ||||
|                 "Cannot unban a user who is not banned.", | ||||
|             ))? | ||||
|             .1 | ||||
|             .content, | ||||
|     ) | ||||
|     .expect("from_value::<Raw<..>> can never fail") | ||||
|  | @ -321,6 +328,7 @@ pub async fn unban_user_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -670,7 +678,7 @@ async fn join_room_by_id_helper( | |||
|                 pdu_id.clone().into(), | ||||
|                 &db.globals, | ||||
|                 &db.account_data, | ||||
|                 &db.sending, | ||||
|                 &db.admin, | ||||
|             )?; | ||||
| 
 | ||||
|             if state_events.contains(ev_id) { | ||||
|  | @ -700,6 +708,7 @@ async fn join_room_by_id_helper( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  |  | |||
|  | @ -67,6 +67,7 @@ pub async fn send_message_event_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  |  | |||
|  | @ -48,6 +48,7 @@ pub async fn set_displayname_route( | |||
|                                     "Tried to send displayname update for user not in the room.", | ||||
|                                 ) | ||||
|                             })? | ||||
|                             .1 | ||||
|                             .content | ||||
|                             .clone(), | ||||
|                     ) | ||||
|  | @ -64,6 +65,7 @@ pub async fn set_displayname_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  | @ -142,6 +144,7 @@ pub async fn set_avatar_url_route( | |||
|                                     "Tried to send avatar url update for user not in the room.", | ||||
|                                 ) | ||||
|                             })? | ||||
|                             .1 | ||||
|                             .content | ||||
|                             .clone(), | ||||
|                     ) | ||||
|  | @ -158,6 +161,7 @@ pub async fn set_avatar_url_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
| 
 | ||||
|  |  | |||
|  | @ -33,6 +33,7 @@ pub async fn redact_event_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  |  | |||
|  | @ -65,6 +65,7 @@ pub async fn create_room_route( | |||
|         &room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -88,6 +89,7 @@ pub async fn create_room_route( | |||
|         &room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -131,6 +133,7 @@ pub async fn create_room_route( | |||
|         &room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -165,6 +168,7 @@ pub async fn create_room_route( | |||
|         &room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -184,6 +188,7 @@ pub async fn create_room_route( | |||
|         &room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -211,6 +216,7 @@ pub async fn create_room_route( | |||
|         &room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -232,6 +238,7 @@ pub async fn create_room_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  | @ -255,6 +262,7 @@ pub async fn create_room_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  | @ -275,6 +283,7 @@ pub async fn create_room_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  | @ -300,6 +309,7 @@ pub async fn create_room_route( | |||
|             &room_id, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  | @ -387,6 +397,7 @@ pub async fn upgrade_room_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -395,6 +406,7 @@ pub async fn upgrade_room_route( | |||
|         db.rooms | ||||
|             .room_state_get(&body.room_id, &EventType::RoomCreate, "")? | ||||
|             .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? | ||||
|             .1 | ||||
|             .content, | ||||
|     ) | ||||
|     .expect("Raw::from_value always works") | ||||
|  | @ -428,6 +440,7 @@ pub async fn upgrade_room_route( | |||
|         &replacement_room, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -451,6 +464,7 @@ pub async fn upgrade_room_route( | |||
|         &replacement_room, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  | @ -470,7 +484,7 @@ pub async fn upgrade_room_route( | |||
|     // Replicate transferable state events to the new room
 | ||||
|     for event_type in transferable_state_events { | ||||
|         let event_content = match db.rooms.room_state_get(&body.room_id, &event_type, "")? { | ||||
|             Some(v) => v.content.clone(), | ||||
|             Some((_, v)) => v.content.clone(), | ||||
|             None => continue, // Skipping missing events.
 | ||||
|         }; | ||||
| 
 | ||||
|  | @ -486,6 +500,7 @@ pub async fn upgrade_room_route( | |||
|             &replacement_room, | ||||
|             &db.globals, | ||||
|             &db.sending, | ||||
|             &db.admin, | ||||
|             &db.account_data, | ||||
|         )?; | ||||
|     } | ||||
|  | @ -502,6 +517,7 @@ pub async fn upgrade_room_route( | |||
|             db.rooms | ||||
|                 .room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")? | ||||
|                 .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))? | ||||
|                 .1 | ||||
|                 .content, | ||||
|         ) | ||||
|         .expect("database contains invalid PDU") | ||||
|  | @ -530,6 +546,7 @@ pub async fn upgrade_room_route( | |||
|         &body.room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  |  | |||
|  | @ -109,7 +109,7 @@ pub async fn get_state_events_route( | |||
|         if !matches!( | ||||
|             db.rooms | ||||
|                 .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? | ||||
|                 .map(|event| { | ||||
|                 .map(|(_, event)| { | ||||
|                     serde_json::from_value::<HistoryVisibilityEventContent>(event.content) | ||||
|                         .map_err(|_| { | ||||
|                             Error::bad_database( | ||||
|  | @ -154,7 +154,7 @@ pub async fn get_state_events_for_key_route( | |||
|         if !matches!( | ||||
|             db.rooms | ||||
|                 .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? | ||||
|                 .map(|event| { | ||||
|                 .map(|(_, event)| { | ||||
|                     serde_json::from_value::<HistoryVisibilityEventContent>(event.content) | ||||
|                         .map_err(|_| { | ||||
|                             Error::bad_database( | ||||
|  | @ -178,7 +178,8 @@ pub async fn get_state_events_for_key_route( | |||
|         .ok_or(Error::BadRequest( | ||||
|             ErrorKind::NotFound, | ||||
|             "State event not found.", | ||||
|         ))?; | ||||
|         ))? | ||||
|         .1; | ||||
| 
 | ||||
|     Ok(get_state_events_for_key::Response { | ||||
|         content: serde_json::value::to_raw_value(&event.content) | ||||
|  | @ -203,7 +204,7 @@ pub async fn get_state_events_for_empty_key_route( | |||
|         if !matches!( | ||||
|             db.rooms | ||||
|                 .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")? | ||||
|                 .map(|event| { | ||||
|                 .map(|(_, event)| { | ||||
|                     serde_json::from_value::<HistoryVisibilityEventContent>(event.content) | ||||
|                         .map_err(|_| { | ||||
|                             Error::bad_database( | ||||
|  | @ -227,7 +228,8 @@ pub async fn get_state_events_for_empty_key_route( | |||
|         .ok_or(Error::BadRequest( | ||||
|             ErrorKind::NotFound, | ||||
|             "State event not found.", | ||||
|         ))?; | ||||
|         ))? | ||||
|         .1; | ||||
| 
 | ||||
|     Ok(get_state_events_for_empty_key::Response { | ||||
|         content: serde_json::value::to_raw_value(&event) | ||||
|  | @ -282,6 +284,7 @@ pub async fn send_state_event_for_key_helper( | |||
|         &room_id, | ||||
|         &db.globals, | ||||
|         &db.sending, | ||||
|         &db.admin, | ||||
|         &db.account_data, | ||||
|     )?; | ||||
| 
 | ||||
|  |  | |||
|  | @ -440,23 +440,8 @@ pub async fn sync_events_route( | |||
|     let mut left_rooms = BTreeMap::new(); | ||||
|     for room_id in db.rooms.rooms_left(&sender_user) { | ||||
|         let room_id = room_id?; | ||||
|         let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?; | ||||
|         let room_events = pdus | ||||
|             .filter_map(|pdu| pdu.ok()) // Filter out buggy events
 | ||||
|             .map(|(_, pdu)| pdu.to_sync_room_event()) | ||||
|             .collect(); | ||||
| 
 | ||||
|         let left_room = sync_events::LeftRoom { | ||||
|             account_data: sync_events::AccountData { events: Vec::new() }, | ||||
|             timeline: sync_events::Timeline { | ||||
|                 limited: false, | ||||
|                 prev_batch: Some(next_batch.clone()), | ||||
|                 events: room_events, | ||||
|             }, | ||||
|             state: sync_events::State { events: Vec::new() }, | ||||
|         }; | ||||
| 
 | ||||
|         let since_member = db | ||||
|         let since_member = if let Some(since_member) = db | ||||
|             .rooms | ||||
|             .pdus_after(sender_user, &room_id, since) | ||||
|             .next() | ||||
|  | @ -475,20 +460,25 @@ pub async fn sync_events_route( | |||
|                     .ok_or_else(|| Error::bad_database("State hash in db doesn't have a state.")) | ||||
|                     .ok() | ||||
|             }) | ||||
|             .and_then(|pdu| { | ||||
|             .and_then(|(pdu_id, pdu)| { | ||||
|                 serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>( | ||||
|                     pdu.content, | ||||
|                     pdu.content.clone(), | ||||
|                 ) | ||||
|                 .expect("Raw::from_value always works") | ||||
|                 .deserialize() | ||||
|                 .map_err(|_| Error::bad_database("Invalid PDU in database.")) | ||||
|                 .map(|content| (pdu_id, pdu, content)) | ||||
|                 .ok() | ||||
|             }); | ||||
|             }) { | ||||
|             since_member | ||||
|         } else { | ||||
|             // We couldn't find the since_member event. This is very weird - we better abort
 | ||||
|             continue; | ||||
|         }; | ||||
| 
 | ||||
|         let left_since_last_sync = | ||||
|             since_member.map_or(false, |member| member.membership == MembershipState::Join); | ||||
|         let left_since_last_sync = since_member.2.membership == MembershipState::Join; | ||||
| 
 | ||||
|         if left_since_last_sync { | ||||
|         let left_room = if left_since_last_sync { | ||||
|             device_list_left.extend( | ||||
|                 db.rooms | ||||
|                     .room_members(&room_id) | ||||
|  | @ -503,7 +493,35 @@ pub async fn sync_events_route( | |||
|                         !share_encrypted_room(&db, sender_user, user_id, &room_id) | ||||
|                     }), | ||||
|             ); | ||||
|         } | ||||
| 
 | ||||
|             let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?; | ||||
|             let mut room_events = pdus | ||||
|                 .filter_map(|pdu| pdu.ok()) // Filter out buggy events
 | ||||
|                 .take_while(|(pdu_id, _)| since_member.0 != pdu_id) | ||||
|                 .map(|(_, pdu)| pdu.to_sync_room_event()) | ||||
|                 .collect::<Vec<_>>(); | ||||
|             room_events.push(since_member.1.to_sync_room_event()); | ||||
| 
 | ||||
|             sync_events::LeftRoom { | ||||
|                 account_data: sync_events::AccountData { events: Vec::new() }, | ||||
|                 timeline: sync_events::Timeline { | ||||
|                     limited: false, | ||||
|                     prev_batch: Some(next_batch.clone()), | ||||
|                     events: room_events, | ||||
|                 }, | ||||
|                 state: sync_events::State { events: Vec::new() }, | ||||
|             } | ||||
|         } else { | ||||
|             sync_events::LeftRoom { | ||||
|                 account_data: sync_events::AccountData { events: Vec::new() }, | ||||
|                 timeline: sync_events::Timeline { | ||||
|                     limited: false, | ||||
|                     prev_batch: Some(next_batch.clone()), | ||||
|                     events: Vec::new(), | ||||
|                 }, | ||||
|                 state: sync_events::State { events: Vec::new() }, | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         if !left_room.is_empty() { | ||||
|             left_rooms.insert(room_id.clone(), left_room); | ||||
|  |  | |||
|  | @ -1,4 +1,5 @@ | |||
| pub mod account_data; | ||||
| pub mod admin; | ||||
| pub mod globals; | ||||
| pub mod key_backups; | ||||
| pub mod media; | ||||
|  | @ -12,10 +13,14 @@ use crate::{Error, Result}; | |||
| use directories::ProjectDirs; | ||||
| use futures::StreamExt; | ||||
| use log::info; | ||||
| use rocket::{futures, Config}; | ||||
| use rocket::{ | ||||
|     futures::{self, channel::mpsc}, | ||||
|     Config, | ||||
| }; | ||||
| use ruma::{DeviceId, UserId}; | ||||
| use std::{convert::TryFrom, fs::remove_dir_all}; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Database { | ||||
|     pub globals: globals::Globals, | ||||
|     pub users: users::Users, | ||||
|  | @ -26,6 +31,7 @@ pub struct Database { | |||
|     pub key_backups: key_backups::KeyBackups, | ||||
|     pub transaction_ids: transaction_ids::TransactionIds, | ||||
|     pub sending: sending::Sending, | ||||
|     pub admin: admin::Admin, | ||||
|     pub _db: sled::Db, | ||||
| } | ||||
| 
 | ||||
|  | @ -80,7 +86,9 @@ impl Database { | |||
| 
 | ||||
|         info!("Opened sled database at {}", path); | ||||
| 
 | ||||
|         Ok(Self { | ||||
|         let (admin_sender, admin_receiver) = mpsc::unbounded(); | ||||
| 
 | ||||
|         let db = Self { | ||||
|             globals: globals::Globals::load(db.open_tree("global")?, config)?, | ||||
|             users: users::Users { | ||||
|                 userid_password: db.open_tree("userid_password")?, | ||||
|  | @ -149,10 +157,18 @@ impl Database { | |||
|                 userdevicetxnid_response: db.open_tree("userdevicetxnid_response")?, | ||||
|             }, | ||||
|             sending: sending::Sending { | ||||
|                 serverpduids: db.open_tree("serverpduids")?, | ||||
|                 servernamepduids: db.open_tree("servernamepduids")?, | ||||
|                 servercurrentpdus: db.open_tree("servercurrentpdus")?, | ||||
|             }, | ||||
|             admin: admin::Admin { | ||||
|                 sender: admin_sender, | ||||
|             }, | ||||
|             _db: db, | ||||
|         }) | ||||
|         }; | ||||
| 
 | ||||
|         db.admin.start_handler(db.clone(), admin_receiver); | ||||
| 
 | ||||
|         Ok(db) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) { | ||||
|  |  | |||
|  | @ -8,6 +8,7 @@ use serde::{de::DeserializeOwned, Serialize}; | |||
| use sled::IVec; | ||||
| use std::{collections::HashMap, convert::TryFrom}; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct AccountData { | ||||
|     pub(super) roomuserdataid_accountdata: sled::Tree, // RoomUserDataId = Room + User + Count + Type
 | ||||
| } | ||||
|  |  | |||
							
								
								
									
										74
									
								
								src/database/admin.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										74
									
								
								src/database/admin.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,74 @@ | |||
| use std::convert::{TryFrom, TryInto}; | ||||
| 
 | ||||
| use crate::{pdu::PduBuilder, Error}; | ||||
| use rocket::futures::{channel::mpsc, stream::StreamExt}; | ||||
| use ruma::{events::room::message, events::EventType, UserId}; | ||||
| use tokio::select; | ||||
| 
 | ||||
| pub enum AdminCommand { | ||||
|     SendTextMessage(message::TextMessageEventContent), | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Admin { | ||||
|     pub sender: mpsc::UnboundedSender<AdminCommand>, | ||||
| } | ||||
| 
 | ||||
| impl Admin { | ||||
|     pub fn start_handler( | ||||
|         &self, | ||||
|         db: super::Database, | ||||
|         mut receiver: mpsc::UnboundedReceiver<AdminCommand>, | ||||
|     ) { | ||||
|         tokio::spawn(async move { | ||||
|             // TODO: Use futures when we have long admin commands
 | ||||
|             //let mut futures = FuturesUnordered::new();
 | ||||
| 
 | ||||
|             let conduit_user = UserId::try_from(format!("@conduit:{}", db.globals.server_name())) | ||||
|                 .expect("@conduit:server_name is valid"); | ||||
| 
 | ||||
|             let conduit_room = db | ||||
|                 .rooms | ||||
|                 .id_from_alias( | ||||
|                     &format!("#admins:{}", db.globals.server_name()) | ||||
|                         .try_into() | ||||
|                         .expect("#admins:server_name is a valid room alias"), | ||||
|                 ) | ||||
|                 .unwrap() | ||||
|                 .ok_or_else(|| Error::BadConfig("Conduit instance does not have an #admins room.")) | ||||
|                 .unwrap(); | ||||
| 
 | ||||
|             loop { | ||||
|                 select! { | ||||
|                     Some(event) = receiver.next() => { | ||||
|                         match event { | ||||
|                             AdminCommand::SendTextMessage(message) => { | ||||
|                                 println!("{:?}", message); | ||||
| 
 | ||||
|                                 db.rooms.build_and_append_pdu( | ||||
|                                     PduBuilder { | ||||
|                                         event_type: EventType::RoomMessage, | ||||
|                                         content: serde_json::to_value(message).expect("event is valid, we just created it"), | ||||
|                                         unsigned: None, | ||||
|                                         state_key: None, | ||||
|                                         redacts: None, | ||||
|                                     }, | ||||
|                                     &conduit_user, | ||||
|                                     &conduit_room, | ||||
|                                     &db.globals, | ||||
|                                     &db.sending, | ||||
|                                     &db.admin, | ||||
|                                     &db.account_data, | ||||
|                                 ).unwrap(); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|     } | ||||
| 
 | ||||
|     pub fn send(&self, command: AdminCommand) { | ||||
|         self.sender.unbounded_send(command).unwrap() | ||||
|     } | ||||
| } | ||||
|  | @ -8,6 +8,7 @@ use ruma::{ | |||
| }; | ||||
| use std::{collections::BTreeMap, convert::TryFrom}; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct KeyBackups { | ||||
|     pub(super) backupid_algorithm: sled::Tree, // BackupId = UserId + Version(Count)
 | ||||
|     pub(super) backupid_etag: sled::Tree,      // BackupId = UserId + Version(Count)
 | ||||
|  |  | |||
|  | @ -9,6 +9,7 @@ pub struct FileMeta { | |||
|     pub file: Vec<u8>, | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Media { | ||||
|     pub(super) mediaid_file: sled::Tree, // MediaId = MXC + WidthHeight + Filename + ContentType
 | ||||
| } | ||||
|  |  | |||
|  | @ -27,6 +27,8 @@ use std::{ | |||
|     sync::Arc, | ||||
| }; | ||||
| 
 | ||||
| use super::admin::AdminCommand; | ||||
| 
 | ||||
| /// The unique identifier of each state group.
 | ||||
| ///
 | ||||
| /// This is created when a state group is added to the database by
 | ||||
|  | @ -169,7 +171,7 @@ impl Rooms { | |||
|         state_hash: &StateHashId, | ||||
|         event_type: &EventType, | ||||
|         state_key: &str, | ||||
|     ) -> Result<Option<PduEvent>> { | ||||
|     ) -> Result<Option<(IVec, PduEvent)>> { | ||||
|         let mut key = state_hash.to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(&event_type.to_string().as_bytes()); | ||||
|  | @ -177,14 +179,15 @@ impl Rooms { | |||
|         key.extend_from_slice(&state_key.as_bytes()); | ||||
| 
 | ||||
|         self.stateid_pduid.get(&key)?.map_or(Ok(None), |pdu_id| { | ||||
|             Ok::<_, Error>(Some( | ||||
|             Ok::<_, Error>(Some(( | ||||
|                 pdu_id.clone(), | ||||
|                 serde_json::from_slice::<PduEvent>( | ||||
|                     &self.pduid_pdu.get(pdu_id)?.ok_or_else(|| { | ||||
|                     &self.pduid_pdu.get(&pdu_id)?.ok_or_else(|| { | ||||
|                         Error::bad_database("PDU in state not found in database.") | ||||
|                     })?, | ||||
|                 ) | ||||
|                 .map_err(|_| Error::bad_database("Invalid PDU bytes in room state."))?, | ||||
|             )) | ||||
|             ))) | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|  | @ -216,7 +219,7 @@ impl Rooms { | |||
| 
 | ||||
|         let mut events = StateMap::new(); | ||||
|         for (event_type, state_key) in auth_events { | ||||
|             if let Some(pdu) = self.room_state_get(room_id, &event_type, &state_key)? { | ||||
|             if let Some((_, pdu)) = self.room_state_get(room_id, &event_type, &state_key)? { | ||||
|                 events.insert((event_type, state_key), pdu); | ||||
|             } | ||||
|         } | ||||
|  | @ -299,7 +302,7 @@ impl Rooms { | |||
|         room_id: &RoomId, | ||||
|         event_type: &EventType, | ||||
|         state_key: &str, | ||||
|     ) -> Result<Option<PduEvent>> { | ||||
|     ) -> Result<Option<(IVec, PduEvent)>> { | ||||
|         if let Some(current_state_hash) = self.current_state_hash(room_id)? { | ||||
|             self.state_get(¤t_state_hash, event_type, state_key) | ||||
|         } else { | ||||
|  | @ -367,7 +370,7 @@ impl Rooms { | |||
|     } | ||||
| 
 | ||||
|     /// Returns the pdu.
 | ||||
|     pub fn get_pdu_json_from_id(&self, pdu_id: &IVec) -> Result<Option<serde_json::Value>> { | ||||
|     pub fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result<Option<serde_json::Value>> { | ||||
|         self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| { | ||||
|             Ok(Some( | ||||
|                 serde_json::from_slice(&pdu) | ||||
|  | @ -442,7 +445,7 @@ impl Rooms { | |||
|         pdu_id: IVec, | ||||
|         globals: &super::globals::Globals, | ||||
|         account_data: &super::account_data::AccountData, | ||||
|         sending: &super::sending::Sending, | ||||
|         admin: &super::admin::Admin, | ||||
|     ) -> Result<()> { | ||||
|         self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?; | ||||
| 
 | ||||
|  | @ -513,28 +516,13 @@ impl Rooms { | |||
|                         if let Some(command) = parts.next() { | ||||
|                             let args = parts.collect::<Vec<_>>(); | ||||
| 
 | ||||
|                             self.build_and_append_pdu( | ||||
|                                 PduBuilder { | ||||
|                                     event_type: EventType::RoomMessage, | ||||
|                                     content: serde_json::to_value( | ||||
|                                         message::TextMessageEventContent { | ||||
|                                             body: format!("Command: {}, Args: {:?}", command, args), | ||||
|                                             formatted: None, | ||||
|                                             relates_to: None, | ||||
|                                         }, | ||||
|                                     ) | ||||
|                                     .expect("event is valid, we just created it"), | ||||
|                                     unsigned: None, | ||||
|                                     state_key: None, | ||||
|                                     redacts: None, | ||||
|                             admin.send(AdminCommand::SendTextMessage( | ||||
|                                 message::TextMessageEventContent { | ||||
|                                     body: format!("Command: {}, Args: {:?}", command, args), | ||||
|                                     formatted: None, | ||||
|                                     relates_to: None, | ||||
|                                 }, | ||||
|                                 &UserId::try_from(format!("@conduit:{}", globals.server_name())) | ||||
|                                     .expect("@conduit:server_name is valid"), | ||||
|                                 &pdu.room_id, | ||||
|                                 &globals, | ||||
|                                 &sending, | ||||
|                                 &account_data, | ||||
|                             )?; | ||||
|                             )); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | @ -611,6 +599,7 @@ impl Rooms { | |||
|         room_id: &RoomId, | ||||
|         globals: &super::globals::Globals, | ||||
|         sending: &super::sending::Sending, | ||||
|         admin: &super::admin::Admin, | ||||
|         account_data: &super::account_data::AccountData, | ||||
|     ) -> Result<EventId> { | ||||
|         let PduBuilder { | ||||
|  | @ -653,7 +642,7 @@ impl Rooms { | |||
|                                 }, | ||||
|                         }) | ||||
|                     }, | ||||
|                     |power_levels| { | ||||
|                     |(_, power_levels)| { | ||||
|                         Ok(serde_json::from_value::<Raw<PowerLevelsEventContent>>( | ||||
|                             power_levels.content, | ||||
|                         ) | ||||
|  | @ -664,15 +653,18 @@ impl Rooms { | |||
|                 )?; | ||||
|             let sender_membership = self | ||||
|                 .room_state_get(&room_id, &EventType::RoomMember, &sender.to_string())? | ||||
|                 .map_or(Ok::<_, Error>(member::MembershipState::Leave), |pdu| { | ||||
|                     Ok( | ||||
|                         serde_json::from_value::<Raw<member::MemberEventContent>>(pdu.content) | ||||
|                             .expect("Raw::from_value always works.") | ||||
|                             .deserialize() | ||||
|                             .map_err(|_| Error::bad_database("Invalid Member event in db."))? | ||||
|                             .membership, | ||||
|                     ) | ||||
|                 })?; | ||||
|                 .map_or( | ||||
|                     Ok::<_, Error>(member::MembershipState::Leave), | ||||
|                     |(_, pdu)| { | ||||
|                         Ok( | ||||
|                             serde_json::from_value::<Raw<member::MemberEventContent>>(pdu.content) | ||||
|                                 .expect("Raw::from_value always works.") | ||||
|                                 .deserialize() | ||||
|                                 .map_err(|_| Error::bad_database("Invalid Member event in db."))? | ||||
|                                 .membership, | ||||
|                         ) | ||||
|                     }, | ||||
|                 )?; | ||||
| 
 | ||||
|             let sender_power = power_levels.users.get(&sender).map_or_else( | ||||
|                 || { | ||||
|  | @ -759,7 +751,7 @@ impl Rooms { | |||
| 
 | ||||
|         let mut unsigned = unsigned.unwrap_or_default(); | ||||
|         if let Some(state_key) = &state_key { | ||||
|             if let Some(prev_pdu) = self.room_state_get(&room_id, &event_type, &state_key)? { | ||||
|             if let Some((_, prev_pdu)) = self.room_state_get(&room_id, &event_type, &state_key)? { | ||||
|                 unsigned.insert("prev_content".to_owned(), prev_pdu.content); | ||||
|                 unsigned.insert( | ||||
|                     "prev_sender".to_owned(), | ||||
|  | @ -845,7 +837,7 @@ impl Rooms { | |||
|             pdu_id.clone().into(), | ||||
|             globals, | ||||
|             account_data, | ||||
|             sending, | ||||
|             admin, | ||||
|         )?; | ||||
| 
 | ||||
|         for server in self | ||||
|  | @ -1017,7 +1009,7 @@ impl Rooms { | |||
|                     // Check if the room has a predecessor
 | ||||
|                     if let Some(predecessor) = self | ||||
|                         .room_state_get(&room_id, &EventType::RoomCreate, "")? | ||||
|                         .and_then(|create| { | ||||
|                         .and_then(|(_, create)| { | ||||
|                             serde_json::from_value::< | ||||
|                                 Raw<ruma::events::room::create::CreateEventContent>, | ||||
|                             >(create.content) | ||||
|  |  | |||
|  | @ -1,63 +1,159 @@ | |||
| use std::{collections::HashSet, convert::TryFrom, time::SystemTime}; | ||||
| use std::{collections::HashMap, convert::TryFrom, time::SystemTime}; | ||||
| 
 | ||||
| use crate::{server_server, utils, Error, PduEvent, Result}; | ||||
| use federation::transactions::send_transaction_message; | ||||
| use log::warn; | ||||
| use log::debug; | ||||
| use rocket::futures::stream::{FuturesUnordered, StreamExt}; | ||||
| use ruma::{api::federation, ServerName}; | ||||
| use sled::IVec; | ||||
| use tokio::select; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Sending { | ||||
|     /// The state for a given state hash.
 | ||||
|     pub(super) serverpduids: sled::Tree, // ServerPduId = ServerName + PduId
 | ||||
|     pub(super) servernamepduids: sled::Tree, // ServernamePduId = ServerName + PduId
 | ||||
|     pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = ServerName + PduId (pduid can be empty for reservation)
 | ||||
| } | ||||
| 
 | ||||
| impl Sending { | ||||
|     pub fn start_handler(&self, globals: &super::globals::Globals, rooms: &super::rooms::Rooms) { | ||||
|         let serverpduids = self.serverpduids.clone(); | ||||
|         let servernamepduids = self.servernamepduids.clone(); | ||||
|         let servercurrentpdus = self.servercurrentpdus.clone(); | ||||
|         let rooms = rooms.clone(); | ||||
|         let globals = globals.clone(); | ||||
| 
 | ||||
|         tokio::spawn(async move { | ||||
|             let mut futures = FuturesUnordered::new(); | ||||
|             let mut waiting_servers = HashSet::new(); | ||||
| 
 | ||||
|             let mut subscriber = serverpduids.watch_prefix(b""); | ||||
|             // Retry requests we could not finish yet
 | ||||
|             let mut current_transactions = HashMap::new(); | ||||
| 
 | ||||
|             for (server, pdu) in servercurrentpdus | ||||
|                 .iter() | ||||
|                 .filter_map(|r| r.ok()) | ||||
|                 .map(|(key, _)| { | ||||
|                     let mut parts = key.splitn(2, |&b| b == 0xff); | ||||
|                     let server = parts.next().expect("splitn always returns one element"); | ||||
|                     let pdu = parts.next().ok_or_else(|| { | ||||
|                         Error::bad_database("Invalid bytes in servercurrentpdus.") | ||||
|                     })?; | ||||
| 
 | ||||
|                     Ok::<_, Error>(( | ||||
|                         Box::<ServerName>::try_from(utils::string_from_bytes(&server).map_err( | ||||
|                             |_| { | ||||
|                                 Error::bad_database( | ||||
|                                     "Invalid server bytes in server_currenttransaction", | ||||
|                                 ) | ||||
|                             }, | ||||
|                         )?) | ||||
|                         .map_err(|_| { | ||||
|                             Error::bad_database( | ||||
|                                 "Invalid server string in server_currenttransaction", | ||||
|                             ) | ||||
|                         })?, | ||||
|                         IVec::from(pdu), | ||||
|                     )) | ||||
|                 }) | ||||
|                 .filter_map(|r| r.ok()) | ||||
|                 .filter(|(_, pdu)| !pdu.is_empty()) // Skip reservation key
 | ||||
|                 .take(50) | ||||
|             // This should not contain more than 50 anyway
 | ||||
|             { | ||||
|                 current_transactions | ||||
|                     .entry(server) | ||||
|                     .or_insert_with(Vec::new) | ||||
|                     .push(pdu); | ||||
|             } | ||||
| 
 | ||||
|             for (server, pdus) in current_transactions { | ||||
|                 futures.push(Self::handle_event(server, pdus, &globals, &rooms)); | ||||
|             } | ||||
| 
 | ||||
|             let mut subscriber = servernamepduids.watch_prefix(b""); | ||||
|             loop { | ||||
|                 select! { | ||||
|                     Some(server) = futures.next() => { | ||||
|                         warn!("response: {:?}", &server); | ||||
|                         warn!("futures left: {}", &futures.len()); | ||||
|                         debug!("response: {:?}", &server); | ||||
|                         match server { | ||||
|                             Ok((server, _response)) => { | ||||
|                                 waiting_servers.remove(&server) | ||||
|                                 let mut prefix = server.as_bytes().to_vec(); | ||||
|                                 prefix.push(0xff); | ||||
| 
 | ||||
|                                 for key in servercurrentpdus | ||||
|                                     .scan_prefix(&prefix) | ||||
|                                     .keys() | ||||
|                                     .filter_map(|r| r.ok()) | ||||
|                                 { | ||||
|                                     // Don't remove reservation yet
 | ||||
|                                     if prefix.len() != key.len() { | ||||
|                                         servercurrentpdus.remove(key).unwrap(); | ||||
|                                     } | ||||
|                                 } | ||||
| 
 | ||||
|                                 // Find events that have been added since starting the last request
 | ||||
|                                 let new_pdus = servernamepduids | ||||
|                                     .scan_prefix(&prefix) | ||||
|                                     .keys() | ||||
|                                     .filter_map(|r| r.ok()) | ||||
|                                     .map(|k| { | ||||
|                                         k.subslice(prefix.len(), k.len() - prefix.len()) | ||||
|                                     }) | ||||
|                                     .take(50) | ||||
|                                     .collect::<Vec<_>>(); | ||||
| 
 | ||||
|                                 if !new_pdus.is_empty() { | ||||
|                                     for pdu_id in &new_pdus { | ||||
|                                         let mut current_key = prefix.clone(); | ||||
|                                         current_key.extend_from_slice(pdu_id); | ||||
|                                         servercurrentpdus.insert(¤t_key, &[]).unwrap(); | ||||
|                                         servernamepduids.remove(¤t_key).unwrap(); | ||||
|                                     } | ||||
| 
 | ||||
|                                     futures.push(Self::handle_event(server, new_pdus, &globals, &rooms)); | ||||
|                                 } else { | ||||
|                                     servercurrentpdus.remove(&prefix).unwrap(); | ||||
|                                     // servercurrentpdus with the prefix should be empty now
 | ||||
|                                 } | ||||
|                             } | ||||
|                             Err((server, _e)) => { | ||||
|                                 waiting_servers.remove(&server) | ||||
|                             Err((_server, _e)) => { | ||||
|                                 // TODO: exponential backoff
 | ||||
|                             } | ||||
|                         }; | ||||
|                     }, | ||||
|                     Some(event) = &mut subscriber => { | ||||
|                         if let sled::Event::Insert { key, .. } = event { | ||||
|                             let serverpduid = key.clone(); | ||||
|                             let mut parts = serverpduid.splitn(2, |&b| b == 0xff); | ||||
|                             let servernamepduid = key.clone(); | ||||
|                             let mut parts = servernamepduid.splitn(2, |&b| b == 0xff); | ||||
| 
 | ||||
|                             if let Some((server, pdu_id)) = utils::string_from_bytes( | ||||
|                                     parts | ||||
|                                         .next() | ||||
|                                         .expect("splitn will always return 1 or more elements"), | ||||
|                                 ) | ||||
|                                 .map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid.")) | ||||
|                                 .map_err(|_| Error::bad_database("ServerName in servernamepduid bytes are invalid.")) | ||||
|                                 .and_then(|server_str|Box::<ServerName>::try_from(server_str) | ||||
|                                     .map_err(|_| Error::bad_database("ServerName in serverpduid is invalid."))) | ||||
|                                     .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))) | ||||
|                                 .ok() | ||||
|                                 .filter(|server| waiting_servers.insert(server.clone())) | ||||
|                                 .and_then(|server| parts | ||||
|                                 .next() | ||||
|                                 .ok_or_else(|| Error::bad_database("Invalid serverpduid in db.")).ok().map(|pdu_id| (server, pdu_id))) | ||||
|                                     .next() | ||||
|                                     .ok_or_else(|| Error::bad_database("Invalid servernamepduid in db.")) | ||||
|                                     .ok() | ||||
|                                     .map(|pdu_id| (server, pdu_id)) | ||||
|                                 ) | ||||
|                                 // TODO: exponential backoff
 | ||||
|                                 .filter(|(server, _)| { | ||||
|                                     let mut prefix = server.to_string().as_bytes().to_vec(); | ||||
|                                     prefix.push(0xff); | ||||
| 
 | ||||
|                                     servercurrentpdus | ||||
|                                         .compare_and_swap(prefix, Option::<&[u8]>::None, Some(&[])) // Try to reserve
 | ||||
|                                         == Ok(Ok(())) | ||||
|                                 }) | ||||
|                             { | ||||
|                                 futures.push(Self::handle_event(server, pdu_id.into(), &globals, &rooms)); | ||||
|                                 servercurrentpdus.insert(&key, &[]).unwrap(); | ||||
|                                 servernamepduids.remove(&key).unwrap(); | ||||
| 
 | ||||
|                                 futures.push(Self::handle_event(server, vec![pdu_id.into()], &globals, &rooms)); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|  | @ -70,38 +166,44 @@ impl Sending { | |||
|         let mut key = server.as_bytes().to_vec(); | ||||
|         key.push(0xff); | ||||
|         key.extend_from_slice(pdu_id); | ||||
|         self.serverpduids.insert(key, b"")?; | ||||
|         self.servernamepduids.insert(key, b"")?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn handle_event( | ||||
|         server: Box<ServerName>, | ||||
|         pdu_id: IVec, | ||||
|         pdu_ids: Vec<IVec>, | ||||
|         globals: &super::globals::Globals, | ||||
|         rooms: &super::rooms::Rooms, | ||||
|     ) -> std::result::Result< | ||||
|         (Box<ServerName>, send_transaction_message::v1::Response), | ||||
|         (Box<ServerName>, Error), | ||||
|     > { | ||||
|         let pdu_json = PduEvent::convert_to_outgoing_federation_event( | ||||
|             rooms | ||||
|                 .get_pdu_json_from_id(&pdu_id) | ||||
|                 .map_err(|e| (server.clone(), e))? | ||||
|                 .ok_or_else(|| { | ||||
|                     ( | ||||
|                         server.clone(), | ||||
|                         Error::bad_database("Event in serverpduids not found in db."), | ||||
|                     ) | ||||
|                 })?, | ||||
|         ); | ||||
|         let pdu_jsons = pdu_ids | ||||
|             .iter() | ||||
|             .map(|pdu_id| { | ||||
|                 Ok::<_, (Box<ServerName>, Error)>(PduEvent::convert_to_outgoing_federation_event( | ||||
|                     rooms | ||||
|                         .get_pdu_json_from_id(pdu_id) | ||||
|                         .map_err(|e| (server.clone(), e))? | ||||
|                         .ok_or_else(|| { | ||||
|                             ( | ||||
|                                 server.clone(), | ||||
|                                 Error::bad_database("Event in servernamepduids not found in db."), | ||||
|                             ) | ||||
|                         })?, | ||||
|                 )) | ||||
|             }) | ||||
|             .filter_map(|r| r.ok()) | ||||
|             .collect::<Vec<_>>(); | ||||
| 
 | ||||
|         server_server::send_request( | ||||
|             &globals, | ||||
|             server.clone(), | ||||
|             send_transaction_message::v1::Request { | ||||
|                 origin: globals.server_name(), | ||||
|                 pdus: &[pdu_json], | ||||
|                 pdus: &pdu_jsons, | ||||
|                 edus: &[], | ||||
|                 origin_server_ts: SystemTime::now(), | ||||
|                 transaction_id: &utils::random_string(16), | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ use crate::Result; | |||
| use ruma::{DeviceId, UserId}; | ||||
| use sled::IVec; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct TransactionIds { | ||||
|     pub(super) userdevicetxnid_response: sled::Tree, // Response can be empty (/sendToDevice) or the event id (/send)
 | ||||
| } | ||||
|  |  | |||
|  | @ -7,6 +7,7 @@ use ruma::{ | |||
|     DeviceId, UserId, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Uiaa { | ||||
|     pub(super) userdeviceid_uiaainfo: sled::Tree, // User-interactive authentication
 | ||||
| } | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ use ruma::{ | |||
| }; | ||||
| use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime}; | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct Users { | ||||
|     pub(super) userid_password: sled::Tree, | ||||
|     pub(super) userid_displayname: sled::Tree, | ||||
|  |  | |||
|  | @ -186,7 +186,10 @@ where | |||
|             let body = reqwest_response | ||||
|                 .bytes() | ||||
|                 .await | ||||
|                 .unwrap() | ||||
|                 .unwrap_or_else(|e| { | ||||
|                     warn!("server error: {}", e); | ||||
|                     Vec::new().into() | ||||
|                 }) // TODO: handle timeout
 | ||||
|                 .into_iter() | ||||
|                 .collect(); | ||||
| 
 | ||||
|  | @ -401,7 +404,7 @@ pub fn send_transaction_message_route<'a>( | |||
|                 pdu_id.clone().into(), | ||||
|                 &db.globals, | ||||
|                 &db.account_data, | ||||
|                 &db.sending, | ||||
|                 &db.admin, | ||||
|             )?; | ||||
|         } | ||||
|     } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue