parent
5aa56b92ee
commit
9f8c45c763
|
@ -56,6 +56,7 @@ pub async fn send_event_to_device_route(
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
.expect("DirectToDevice EDU can be serialized"),
|
.expect("DirectToDevice EDU can be serialized"),
|
||||||
|
db.globals.next_count()?,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -84,8 +84,8 @@ pub enum SendingEventType {
|
||||||
pub struct Sending {
|
pub struct Sending {
|
||||||
/// The state for a given state hash.
|
/// The state for a given state hash.
|
||||||
pub(super) servername_educount: Arc<dyn Tree>, // EduCount: Count of last EDU sync
|
pub(super) servername_educount: Arc<dyn Tree>, // EduCount: Count of last EDU sync
|
||||||
pub(super) servernameevent_data: Arc<dyn Tree>, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / * (for edus), Data = EDU content
|
pub(super) servernameevent_data: Arc<dyn Tree>, // ServernamEvent = (+ / $)SenderKey / ServerName / UserId + PduId / Id (for edus), Data = EDU content
|
||||||
pub(super) servercurrentevent_data: Arc<dyn Tree>, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / * (for edus), Data = EDU content
|
pub(super) servercurrentevent_data: Arc<dyn Tree>, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / Id (for edus), Data = EDU content
|
||||||
pub(super) maximum_requests: Arc<Semaphore>,
|
pub(super) maximum_requests: Arc<Semaphore>,
|
||||||
pub sender: mpsc::UnboundedSender<(Vec<u8>, Vec<u8>)>,
|
pub sender: mpsc::UnboundedSender<(Vec<u8>, Vec<u8>)>,
|
||||||
}
|
}
|
||||||
|
@ -435,10 +435,15 @@ impl Sending {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, server, serialized))]
|
#[tracing::instrument(skip(self, server, serialized))]
|
||||||
pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
pub fn send_reliable_edu(
|
||||||
|
&self,
|
||||||
|
server: &ServerName,
|
||||||
|
serialized: Vec<u8>,
|
||||||
|
id: u64,
|
||||||
|
) -> Result<()> {
|
||||||
let mut key = server.as_bytes().to_vec();
|
let mut key = server.as_bytes().to_vec();
|
||||||
key.push(0xff);
|
key.push(0xff);
|
||||||
key.push(b'*');
|
key.extend_from_slice(&id.to_be_bytes());
|
||||||
self.servernameevent_data.insert(&key, &serialized)?;
|
self.servernameevent_data.insert(&key, &serialized)?;
|
||||||
self.sender.unbounded_send((key, serialized)).unwrap();
|
self.sender.unbounded_send((key, serialized)).unwrap();
|
||||||
|
|
||||||
|
@ -714,10 +719,10 @@ impl Sending {
|
||||||
OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| {
|
OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| {
|
||||||
Error::bad_database("Invalid server string in server_currenttransaction")
|
Error::bad_database("Invalid server string in server_currenttransaction")
|
||||||
})?),
|
})?),
|
||||||
if event.starts_with(b"*") {
|
if value.is_empty() {
|
||||||
SendingEventType::Edu(value)
|
|
||||||
} else {
|
|
||||||
SendingEventType::Pdu(event.to_vec())
|
SendingEventType::Pdu(event.to_vec())
|
||||||
|
} else {
|
||||||
|
SendingEventType::Edu(value)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
} else if key.starts_with(b"$") {
|
} else if key.starts_with(b"$") {
|
||||||
|
@ -732,10 +737,10 @@ impl Sending {
|
||||||
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
||||||
(
|
(
|
||||||
OutgoingKind::Push(user.to_vec(), pushkey.to_vec()),
|
OutgoingKind::Push(user.to_vec(), pushkey.to_vec()),
|
||||||
if event.starts_with(b"*") {
|
if value.is_empty() {
|
||||||
SendingEventType::Edu(value)
|
|
||||||
} else {
|
|
||||||
SendingEventType::Pdu(event.to_vec())
|
SendingEventType::Pdu(event.to_vec())
|
||||||
|
} else {
|
||||||
|
SendingEventType::Edu(value)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
@ -753,10 +758,10 @@ impl Sending {
|
||||||
OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| {
|
OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| {
|
||||||
Error::bad_database("Invalid server string in server_currenttransaction")
|
Error::bad_database("Invalid server string in server_currenttransaction")
|
||||||
})?),
|
})?),
|
||||||
if event.starts_with(b"*") {
|
if value.is_empty() {
|
||||||
SendingEventType::Edu(event[1..].to_vec())
|
|
||||||
} else {
|
|
||||||
SendingEventType::Pdu(event.to_vec())
|
SendingEventType::Pdu(event.to_vec())
|
||||||
|
} else {
|
||||||
|
SendingEventType::Edu(value)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue