fix: signature key fetching, optimize push sending
parent
d4c76f4654
commit
363c629faf
|
@ -1623,7 +1623,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma"
|
name = "ruma"
|
||||||
version = "0.0.2"
|
version = "0.0.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"assign",
|
"assign",
|
||||||
"js_int",
|
"js_int",
|
||||||
|
@ -1643,7 +1642,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-api"
|
name = "ruma-api"
|
||||||
version = "0.17.0-alpha.2"
|
version = "0.17.0-alpha.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"http",
|
"http",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
|
@ -1658,7 +1656,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-api-macros"
|
name = "ruma-api-macros"
|
||||||
version = "0.17.0-alpha.2"
|
version = "0.17.0-alpha.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro-crate",
|
"proc-macro-crate",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
|
@ -1669,7 +1666,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-appservice-api"
|
name = "ruma-appservice-api"
|
||||||
version = "0.2.0-alpha.2"
|
version = "0.2.0-alpha.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ruma-api",
|
"ruma-api",
|
||||||
"ruma-common",
|
"ruma-common",
|
||||||
|
@ -1683,7 +1679,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-client-api"
|
name = "ruma-client-api"
|
||||||
version = "0.10.0-alpha.2"
|
version = "0.10.0-alpha.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"assign",
|
"assign",
|
||||||
"http",
|
"http",
|
||||||
|
@ -1702,7 +1697,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-common"
|
name = "ruma-common"
|
||||||
version = "0.3.0-alpha.1"
|
version = "0.3.0-alpha.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"maplit",
|
"maplit",
|
||||||
|
@ -1715,7 +1709,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-events"
|
name = "ruma-events"
|
||||||
version = "0.22.0-alpha.2"
|
version = "0.22.0-alpha.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"ruma-common",
|
"ruma-common",
|
||||||
|
@ -1729,7 +1722,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-events-macros"
|
name = "ruma-events-macros"
|
||||||
version = "0.22.0-alpha.2"
|
version = "0.22.0-alpha.2"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro-crate",
|
"proc-macro-crate",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
|
@ -1740,7 +1732,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-federation-api"
|
name = "ruma-federation-api"
|
||||||
version = "0.1.0-alpha.1"
|
version = "0.1.0-alpha.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"ruma-api",
|
"ruma-api",
|
||||||
|
@ -1755,7 +1746,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-identifiers"
|
name = "ruma-identifiers"
|
||||||
version = "0.18.0-alpha.1"
|
version = "0.18.0-alpha.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"paste",
|
"paste",
|
||||||
"rand",
|
"rand",
|
||||||
|
@ -1769,7 +1759,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-identifiers-macros"
|
name = "ruma-identifiers-macros"
|
||||||
version = "0.18.0-alpha.1"
|
version = "0.18.0-alpha.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -1780,12 +1769,10 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-identifiers-validation"
|
name = "ruma-identifiers-validation"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-identity-service-api"
|
name = "ruma-identity-service-api"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ruma-api",
|
"ruma-api",
|
||||||
"ruma-common",
|
"ruma-common",
|
||||||
|
@ -1798,7 +1785,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-push-gateway-api"
|
name = "ruma-push-gateway-api"
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"js_int",
|
"js_int",
|
||||||
"ruma-api",
|
"ruma-api",
|
||||||
|
@ -1813,7 +1799,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-serde"
|
name = "ruma-serde"
|
||||||
version = "0.3.0"
|
version = "0.3.0"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"form_urlencoded",
|
"form_urlencoded",
|
||||||
"itoa",
|
"itoa",
|
||||||
|
@ -1826,7 +1811,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-serde-macros"
|
name = "ruma-serde-macros"
|
||||||
version = "0.3.0"
|
version = "0.3.0"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro-crate",
|
"proc-macro-crate",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
|
@ -1837,7 +1821,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ruma-signatures"
|
name = "ruma-signatures"
|
||||||
version = "0.6.0-alpha.1"
|
version = "0.6.0-alpha.1"
|
||||||
source = "git+https://github.com/ruma/ruma?rev=f196f5b6f164973d6b343af31ab4e0457f743675#f196f5b6f164973d6b343af31ab4e0457f743675"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.13.0",
|
"base64 0.13.0",
|
||||||
"ring",
|
"ring",
|
||||||
|
@ -2105,7 +2088,6 @@ checksum = "3015a7d0a5fd5105c91c3710d42f9ccf0abfb287d62206484dcc67f9569a6483"
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "state-res"
|
name = "state-res"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/ruma/state-res?rev=34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488#34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"itertools 0.10.0",
|
"itertools 0.10.0",
|
||||||
"log",
|
"log",
|
||||||
|
|
|
@ -18,16 +18,15 @@ rocket = { git = "https://github.com/SergioBenitez/Rocket.git", rev = "93e62c86e
|
||||||
#rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] }
|
#rocket = { git = "https://github.com/timokoesters/Rocket.git", branch = "empty_parameters", default-features = false, features = ["tls"] }
|
||||||
|
|
||||||
# Used for matrix spec type definitions and helpers
|
# Used for matrix spec type definitions and helpers
|
||||||
ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "f196f5b6f164973d6b343af31ab4e0457f743675" }
|
#ruma = { git = "https://github.com/ruma/ruma", features = ["rand", "appservice-api", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks", "unstable-exhaustive-types"], rev = "f196f5b6f164973d6b343af31ab4e0457f743675" }
|
||||||
#ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "push-gateway-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" }
|
#ruma = { git = "https://github.com/DevinR528/ruma", features = ["rand", "client-api", "federation-api", "push-gateway-api", "unstable-exhaustive-types", "unstable-pre-spec", "unstable-synapse-quirks"], branch = "verified-export" }
|
||||||
#ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
|
ruma = { path = "../ruma/ruma", features = ["unstable-exhaustive-types", "rand", "client-api", "federation-api", "push-gateway-api", "unstable-pre-spec", "unstable-synapse-quirks"] }
|
||||||
|
|
||||||
# Used when doing state resolution
|
# Used when doing state resolution
|
||||||
# state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] }
|
# state-res = { git = "https://github.com/timokoesters/state-res", branch = "timo-spec-comp", features = ["unstable-pre-spec"] }
|
||||||
# TODO: remove the gen-eventid feature
|
# TODO: remove the gen-eventid feature
|
||||||
#state-res = { git = "https://github.com/ruma/state-res", branch = "main", features = ["unstable-pre-spec", "gen-eventid"] }
|
#state-res = { git = "https://github.com/ruma/state-res", rev = "34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488", features = ["unstable-pre-spec", "gen-eventid"] }
|
||||||
state-res = { git = "https://github.com/ruma/state-res", rev = "34cd1cb4dcdd5fb84b5df9e48e63b2e4669a2488", features = ["unstable-pre-spec", "gen-eventid"] }
|
state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
|
||||||
#state-res = { path = "../state-res", features = ["unstable-pre-spec", "gen-eventid"] }
|
|
||||||
|
|
||||||
# Used for long polling and federation sender, should be the same as rocket::tokio
|
# Used for long polling and federation sender, should be the same as rocket::tokio
|
||||||
tokio = "1.2.0"
|
tokio = "1.2.0"
|
||||||
|
|
|
@ -689,7 +689,7 @@ pub async fn get_pushers_route(
|
||||||
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
|
||||||
|
|
||||||
Ok(get_pushers::Response {
|
Ok(get_pushers::Response {
|
||||||
pushers: db.pusher.get_pusher(sender_user)?,
|
pushers: db.pusher.get_pushers(sender_user)?,
|
||||||
}
|
}
|
||||||
.into())
|
.into())
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ use ruma::{
|
||||||
push::{Action, PushCondition, PushFormat, Ruleset, Tweak},
|
push::{Action, PushCondition, PushFormat, Ruleset, Tweak},
|
||||||
uint, UInt, UserId,
|
uint, UInt, UserId,
|
||||||
};
|
};
|
||||||
|
use sled::IVec;
|
||||||
|
|
||||||
use std::{convert::TryFrom, fmt::Debug};
|
use std::{convert::TryFrom, fmt::Debug};
|
||||||
|
|
||||||
|
@ -58,7 +59,17 @@ impl PushData {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_pusher(&self, sender: &UserId) -> Result<Vec<Pusher>> {
|
pub fn get_pusher(&self, senderkey: &[u8]) -> Result<Option<Pusher>> {
|
||||||
|
self.senderkey_pusher
|
||||||
|
.get(senderkey)?
|
||||||
|
.map(|push| {
|
||||||
|
Ok(serde_json::from_slice(&*push)
|
||||||
|
.map_err(|_| Error::bad_database("Invalid Pusher in db."))?)
|
||||||
|
})
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_pushers(&self, sender: &UserId) -> Result<Vec<Pusher>> {
|
||||||
let mut prefix = sender.as_bytes().to_vec();
|
let mut prefix = sender.as_bytes().to_vec();
|
||||||
prefix.push(0xff);
|
prefix.push(0xff);
|
||||||
|
|
||||||
|
@ -72,6 +83,16 @@ impl PushData {
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_pusher_senderkeys(&self, sender: &UserId) -> impl Iterator<Item = Result<IVec>> {
|
||||||
|
let mut prefix = sender.as_bytes().to_vec();
|
||||||
|
prefix.push(0xff);
|
||||||
|
|
||||||
|
self.senderkey_pusher
|
||||||
|
.scan_prefix(prefix)
|
||||||
|
.keys()
|
||||||
|
.map(|r| Ok(r?))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_request<T: OutgoingRequest>(
|
pub async fn send_request<T: OutgoingRequest>(
|
||||||
|
@ -155,7 +176,7 @@ where
|
||||||
pub async fn send_push_notice(
|
pub async fn send_push_notice(
|
||||||
user: &UserId,
|
user: &UserId,
|
||||||
unread: UInt,
|
unread: UInt,
|
||||||
pushers: &[Pusher],
|
pusher: &Pusher,
|
||||||
ruleset: Ruleset,
|
ruleset: Ruleset,
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
|
@ -194,7 +215,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,8 +235,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str())
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
.await?;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -246,7 +266,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str())
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str())
|
||||||
.await?;
|
.await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -272,7 +292,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str())
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str())
|
||||||
.await?;
|
.await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -289,7 +309,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -326,7 +346,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str())
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str())
|
||||||
.await?;
|
.await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -352,7 +372,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str())
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str())
|
||||||
.await?;
|
.await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -369,7 +389,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -385,7 +405,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -401,7 +421,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -415,7 +435,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -429,7 +449,7 @@ pub async fn send_push_notice(
|
||||||
_ => None,
|
_ => None,
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
send_notice(unread, pushers, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
send_notice(unread, pusher, tweaks, pdu, db, rule.rule_id.as_str()).await?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -442,27 +462,27 @@ pub async fn send_push_notice(
|
||||||
|
|
||||||
async fn send_notice(
|
async fn send_notice(
|
||||||
unread: UInt,
|
unread: UInt,
|
||||||
pushers: &[Pusher],
|
pusher: &Pusher,
|
||||||
tweaks: Vec<Tweak>,
|
tweaks: Vec<Tweak>,
|
||||||
event: &PduEvent,
|
event: &PduEvent,
|
||||||
db: &Database,
|
db: &Database,
|
||||||
name: &str,
|
name: &str,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (http, _emails): (Vec<&Pusher>, _) = pushers
|
// TODO: email
|
||||||
.iter()
|
if pusher.kind == Some(PusherKind::Http) {
|
||||||
.partition(|pusher| pusher.kind == Some(PusherKind::Http));
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// TODO:
|
// TODO:
|
||||||
// Two problems with this
|
// Two problems with this
|
||||||
// 1. if "event_id_only" is the only format kind it seems we should never add more info
|
// 1. if "event_id_only" is the only format kind it seems we should never add more info
|
||||||
// 2. can pusher/devices have conflicting formats
|
// 2. can pusher/devices have conflicting formats
|
||||||
for pusher in http {
|
|
||||||
let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
|
let event_id_only = pusher.data.format == Some(PushFormat::EventIdOnly);
|
||||||
let url = if let Some(url) = pusher.data.url.as_ref() {
|
let url = if let Some(url) = pusher.data.url.as_ref() {
|
||||||
url
|
url
|
||||||
} else {
|
} else {
|
||||||
error!("Http Pusher must have URL specified.");
|
error!("Http Pusher must have URL specified.");
|
||||||
continue;
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone());
|
let mut device = Device::new(pusher.app_id.clone(), pusher.pushkey.clone());
|
||||||
|
@ -530,10 +550,8 @@ async fn send_notice(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: email
|
// TODO: email
|
||||||
// for email in emails {}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,6 @@ pub struct Rooms {
|
||||||
impl Rooms {
|
impl Rooms {
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
/// Builds a StateMap by iterating over all keys that start
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
/// with state_hash, this gives the full state for the given state_hash.
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_full_ids(&self, shortstatehash: u64) -> Result<Vec<EventId>> {
|
pub fn state_full_ids(&self, shortstatehash: u64) -> Result<Vec<EventId>> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.stateid_shorteventid
|
.stateid_shorteventid
|
||||||
|
@ -107,7 +106,6 @@ impl Rooms {
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_full(
|
pub fn state_full(
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
|
@ -628,7 +626,25 @@ impl Rooms {
|
||||||
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
|
.insert(pdu.event_id.as_bytes(), &*pdu_id)?;
|
||||||
|
|
||||||
// See if the event matches any known pushers
|
// See if the event matches any known pushers
|
||||||
db.sending.send_push_pdu(&*pdu_id)?;
|
for user in db
|
||||||
|
.users
|
||||||
|
.iter()
|
||||||
|
.filter_map(|r| r.ok())
|
||||||
|
.filter(|user_id| db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false))
|
||||||
|
{
|
||||||
|
// Don't notify the user of their own events
|
||||||
|
if user == pdu.sender {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for senderkey in db
|
||||||
|
.pusher
|
||||||
|
.get_pusher_senderkeys(&user)
|
||||||
|
.filter_map(|r| r.ok())
|
||||||
|
{
|
||||||
|
db.sending.send_push_pdu(&*pdu_id, senderkey)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
match pdu.kind {
|
match pdu.kind {
|
||||||
EventType::RoomRedaction => {
|
EventType::RoomRedaction => {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
convert::TryFrom,
|
convert::{TryFrom, TryInto},
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant, SystemTime},
|
time::{Duration, Instant, SystemTime},
|
||||||
|
@ -14,9 +14,9 @@ use log::{error, info, warn};
|
||||||
use ring::digest;
|
use ring::digest;
|
||||||
use rocket::futures::stream::{FuturesUnordered, StreamExt};
|
use rocket::futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{appservice, federation, OutgoingRequest},
|
api::{appservice, client::r0::push::Pusher, federation, OutgoingRequest},
|
||||||
events::{push_rules, EventType},
|
events::{push_rules, EventType},
|
||||||
uint, ServerName, UInt,
|
uint, ServerName, UInt, UserId,
|
||||||
};
|
};
|
||||||
use sled::IVec;
|
use sled::IVec;
|
||||||
use tokio::{select, sync::Semaphore};
|
use tokio::{select, sync::Semaphore};
|
||||||
|
@ -24,14 +24,14 @@ use tokio::{select, sync::Semaphore};
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
pub enum OutgoingKind {
|
pub enum OutgoingKind {
|
||||||
Appservice(Box<ServerName>),
|
Appservice(Box<ServerName>),
|
||||||
Push(Vec<u8>),
|
Push(Vec<u8>, Vec<u8>), // user and pushkey
|
||||||
Normal(Box<ServerName>),
|
Normal(Box<ServerName>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Sending {
|
pub struct Sending {
|
||||||
/// The state for a given state hash.
|
/// The state for a given state hash.
|
||||||
pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)ServerName / UserId + PduId
|
pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
|
||||||
pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation)
|
pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = (+ / $)ServerName / UserId + PduId (pduid can be empty for reservation)
|
||||||
pub(super) maximum_requests: Arc<Semaphore>,
|
pub(super) maximum_requests: Arc<Semaphore>,
|
||||||
}
|
}
|
||||||
|
@ -85,9 +85,11 @@ impl Sending {
|
||||||
p.extend_from_slice(server.as_bytes());
|
p.extend_from_slice(server.as_bytes());
|
||||||
p
|
p
|
||||||
}
|
}
|
||||||
OutgoingKind::Push(id) => {
|
OutgoingKind::Push(user, pushkey) => {
|
||||||
let mut p = b"$".to_vec();
|
let mut p = b"$".to_vec();
|
||||||
p.extend_from_slice(&id);
|
p.extend_from_slice(&user);
|
||||||
|
p.push(0xff);
|
||||||
|
p.extend_from_slice(&pushkey);
|
||||||
p
|
p
|
||||||
}
|
}
|
||||||
OutgoingKind::Normal(server) => {
|
OutgoingKind::Normal(server) => {
|
||||||
|
@ -106,6 +108,7 @@ impl Sending {
|
||||||
|
|
||||||
let mut subscriber = servernamepduids.watch_prefix(b"");
|
let mut subscriber = servernamepduids.watch_prefix(b"");
|
||||||
loop {
|
loop {
|
||||||
|
println!(".");
|
||||||
select! {
|
select! {
|
||||||
Some(response) = futures.next() => {
|
Some(response) = futures.next() => {
|
||||||
match response {
|
match response {
|
||||||
|
@ -116,9 +119,11 @@ impl Sending {
|
||||||
p.extend_from_slice(server.as_bytes());
|
p.extend_from_slice(server.as_bytes());
|
||||||
p
|
p
|
||||||
}
|
}
|
||||||
OutgoingKind::Push(id) => {
|
OutgoingKind::Push(user, pushkey) => {
|
||||||
let mut p = b"$".to_vec();
|
let mut p = b"$".to_vec();
|
||||||
p.extend_from_slice(&id);
|
p.extend_from_slice(&user);
|
||||||
|
p.push(0xff);
|
||||||
|
p.extend_from_slice(&pushkey);
|
||||||
p
|
p
|
||||||
},
|
},
|
||||||
OutgoingKind::Normal(server) => {
|
OutgoingKind::Normal(server) => {
|
||||||
|
@ -179,9 +184,11 @@ impl Sending {
|
||||||
p.extend_from_slice(serv.as_bytes());
|
p.extend_from_slice(serv.as_bytes());
|
||||||
p
|
p
|
||||||
},
|
},
|
||||||
OutgoingKind::Push(id) => {
|
OutgoingKind::Push(user, pushkey) => {
|
||||||
let mut p = b"$".to_vec();
|
let mut p = b"$".to_vec();
|
||||||
p.extend_from_slice(&id);
|
p.extend_from_slice(&user);
|
||||||
|
p.push(0xff);
|
||||||
|
p.extend_from_slice(&pushkey);
|
||||||
p
|
p
|
||||||
},
|
},
|
||||||
OutgoingKind::Normal(serv) => {
|
OutgoingKind::Normal(serv) => {
|
||||||
|
@ -208,7 +215,6 @@ impl Sending {
|
||||||
Some(event) = &mut subscriber => {
|
Some(event) = &mut subscriber => {
|
||||||
if let sled::Event::Insert { key, .. } = event {
|
if let sled::Event::Insert { key, .. } = event {
|
||||||
let servernamepduid = key.clone();
|
let servernamepduid = key.clone();
|
||||||
let mut parts = servernamepduid.splitn(2, |&b| b == 0xff);
|
|
||||||
|
|
||||||
let exponential_backoff = |(tries, instant): &(u32, Instant)| {
|
let exponential_backoff = |(tries, instant): &(u32, Instant)| {
|
||||||
// Fail if a request has failed recently (exponential backoff)
|
// Fail if a request has failed recently (exponential backoff)
|
||||||
|
@ -219,33 +225,8 @@ impl Sending {
|
||||||
|
|
||||||
instant.elapsed() < min_elapsed_duration
|
instant.elapsed() < min_elapsed_duration
|
||||||
};
|
};
|
||||||
if let Some((outgoing_kind, pdu_id)) = utils::string_from_bytes(
|
|
||||||
parts
|
if let Some((outgoing_kind, pdu_id)) = Self::parse_servercurrentpdus(&servernamepduid)
|
||||||
.next()
|
|
||||||
.expect("splitn will always return 1 or more elements"),
|
|
||||||
)
|
|
||||||
.map_err(|_| Error::bad_database("[Utf8] ServerName in servernamepduid bytes are invalid."))
|
|
||||||
.and_then(|ident_str| {
|
|
||||||
// Appservices start with a plus
|
|
||||||
Ok(if ident_str.starts_with('+') {
|
|
||||||
OutgoingKind::Appservice(
|
|
||||||
Box::<ServerName>::try_from(&ident_str[1..])
|
|
||||||
.map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))?
|
|
||||||
)
|
|
||||||
} else if ident_str.starts_with('$') {
|
|
||||||
OutgoingKind::Push(ident_str[1..].as_bytes().to_vec())
|
|
||||||
} else {
|
|
||||||
OutgoingKind::Normal(
|
|
||||||
Box::<ServerName>::try_from(ident_str)
|
|
||||||
.map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid."))?
|
|
||||||
)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.and_then(|outgoing_kind| parts
|
|
||||||
.next()
|
|
||||||
.ok_or_else(|| Error::bad_database("Invalid servernamepduid in db."))
|
|
||||||
.map(|pdu_id| (outgoing_kind, pdu_id))
|
|
||||||
)
|
|
||||||
.ok()
|
.ok()
|
||||||
.filter(|(outgoing_kind, _)| {
|
.filter(|(outgoing_kind, _)| {
|
||||||
if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) {
|
if last_failed_try.get(outgoing_kind).map_or(false, exponential_backoff) {
|
||||||
|
@ -258,9 +239,11 @@ impl Sending {
|
||||||
p.extend_from_slice(serv.as_bytes());
|
p.extend_from_slice(serv.as_bytes());
|
||||||
p
|
p
|
||||||
},
|
},
|
||||||
OutgoingKind::Push(id) => {
|
OutgoingKind::Push(user, pushkey) => {
|
||||||
let mut p = b"$".to_vec();
|
let mut p = b"$".to_vec();
|
||||||
p.extend_from_slice(&id);
|
p.extend_from_slice(&user);
|
||||||
|
p.push(0xff);
|
||||||
|
p.extend_from_slice(&pushkey);
|
||||||
p
|
p
|
||||||
},
|
},
|
||||||
OutgoingKind::Normal(serv) => {
|
OutgoingKind::Normal(serv) => {
|
||||||
|
@ -279,6 +262,8 @@ impl Sending {
|
||||||
servercurrentpdus.insert(&key, &[]).unwrap();
|
servercurrentpdus.insert(&key, &[]).unwrap();
|
||||||
servernamepduids.remove(&key).unwrap();
|
servernamepduids.remove(&key).unwrap();
|
||||||
|
|
||||||
|
dbg!("there is a future");
|
||||||
|
|
||||||
futures.push(
|
futures.push(
|
||||||
Self::handle_event(
|
Self::handle_event(
|
||||||
outgoing_kind,
|
outgoing_kind,
|
||||||
|
@ -295,15 +280,9 @@ impl Sending {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn send_push_pdu(&self, pdu_id: &[u8]) -> Result<()> {
|
pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: IVec) -> Result<()> {
|
||||||
// Make sure we don't cause utf8 errors when parsing to a String...
|
|
||||||
let pduid = String::from_utf8_lossy(pdu_id).as_bytes().to_vec();
|
|
||||||
|
|
||||||
// these are valid ServerName chars
|
|
||||||
// (byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'.')
|
|
||||||
let mut key = b"$".to_vec();
|
let mut key = b"$".to_vec();
|
||||||
// keep each pdu push unique
|
key.extend_from_slice(&senderkey);
|
||||||
key.extend_from_slice(pduid.as_slice());
|
|
||||||
key.push(0xff);
|
key.push(0xff);
|
||||||
key.extend_from_slice(pdu_id);
|
key.extend_from_slice(pdu_id);
|
||||||
self.servernamepduids.insert(key, b"")?;
|
self.servernamepduids.insert(key, b"")?;
|
||||||
|
@ -313,6 +292,7 @@ impl Sending {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> {
|
pub fn send_pdu(&self, server: &ServerName, pdu_id: &[u8]) -> Result<()> {
|
||||||
|
dbg!(&server);
|
||||||
let mut key = server.as_bytes().to_vec();
|
let mut key = server.as_bytes().to_vec();
|
||||||
key.push(0xff);
|
key.push(0xff);
|
||||||
key.extend_from_slice(pdu_id);
|
key.extend_from_slice(pdu_id);
|
||||||
|
@ -369,6 +349,8 @@ impl Sending {
|
||||||
.filter_map(|r| r.ok())
|
.filter_map(|r| r.ok())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let permit = db.sending.maximum_requests.acquire().await;
|
let permit = db.sending.maximum_requests.acquire().await;
|
||||||
|
|
||||||
|
error!("sending pdus to {}: {:#?}", server, pdu_jsons);
|
||||||
let response = appservice_server::send_request(
|
let response = appservice_server::send_request(
|
||||||
&db.globals,
|
&db.globals,
|
||||||
db.appservice
|
db.appservice
|
||||||
|
@ -391,17 +373,17 @@ impl Sending {
|
||||||
|
|
||||||
response
|
response
|
||||||
}
|
}
|
||||||
OutgoingKind::Push(id) => {
|
OutgoingKind::Push(user, pushkey) => {
|
||||||
let pdus = pdu_ids
|
let pdus = pdu_ids
|
||||||
.iter()
|
.iter()
|
||||||
.map(|pdu_id| {
|
.map(|pdu_id| {
|
||||||
Ok::<_, (Vec<u8>, Error)>(
|
Ok::<_, (Vec<u8>, Error)>(
|
||||||
db.rooms
|
db.rooms
|
||||||
.get_pdu_from_id(pdu_id)
|
.get_pdu_from_id(pdu_id)
|
||||||
.map_err(|e| (id.clone(), e))?
|
.map_err(|e| (pushkey.clone(), e))?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
(
|
(
|
||||||
id.clone(),
|
pushkey.clone(),
|
||||||
Error::bad_database(
|
Error::bad_database(
|
||||||
"[Push] Event in servernamepduids not found in db.",
|
"[Push] Event in servernamepduids not found in db.",
|
||||||
),
|
),
|
||||||
|
@ -418,35 +400,48 @@ impl Sending {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
for user in db.users.iter().filter_map(|r| r.ok()).filter(|user_id| {
|
let userid = UserId::try_from(utils::string_from_bytes(user).map_err(|e| {
|
||||||
db.rooms.is_joined(&user_id, &pdu.room_id).unwrap_or(false)
|
(
|
||||||
}) {
|
OutgoingKind::Push(user.clone(), pushkey.clone()),
|
||||||
// Don't notify the user of their own events
|
Error::bad_database("Invalid push user string in db."),
|
||||||
if user == pdu.sender {
|
)
|
||||||
continue;
|
})?)
|
||||||
}
|
.map_err(|e| {
|
||||||
|
(
|
||||||
|
OutgoingKind::Push(user.clone(), pushkey.clone()),
|
||||||
|
Error::bad_database("Invalid push user id in db."),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
let pushers = db
|
let mut senderkey = user.clone();
|
||||||
|
senderkey.push(0xff);
|
||||||
|
senderkey.extend_from_slice(pushkey);
|
||||||
|
|
||||||
|
let pusher = match db
|
||||||
.pusher
|
.pusher
|
||||||
.get_pusher(&user)
|
.get_pusher(&senderkey)
|
||||||
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
|
.map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))?
|
||||||
|
{
|
||||||
|
Some(pusher) => pusher,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
|
||||||
let rules_for_user = db
|
let rules_for_user = db
|
||||||
.account_data
|
.account_data
|
||||||
.get::<push_rules::PushRulesEvent>(None, &user, EventType::PushRules)
|
.get::<push_rules::PushRulesEvent>(None, &userid, EventType::PushRules)
|
||||||
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?
|
.map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))?
|
||||||
.map(|ev| ev.content.global)
|
.map(|ev| ev.content.global)
|
||||||
.unwrap_or_else(|| crate::push_rules::default_pushrules(&user));
|
.unwrap_or_else(|| crate::push_rules::default_pushrules(&userid));
|
||||||
|
|
||||||
let unread: UInt = if let Some(last_read) = db
|
let unread: UInt = if let Some(last_read) = db
|
||||||
.rooms
|
.rooms
|
||||||
.edus
|
.edus
|
||||||
.private_read_get(&pdu.room_id, &user)
|
.private_read_get(&pdu.room_id, &userid)
|
||||||
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?
|
.map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))?
|
||||||
{
|
{
|
||||||
(db.rooms
|
(db.rooms
|
||||||
.pdus_since(&user, &pdu.room_id, last_read)
|
.pdus_since(&userid, &pdu.room_id, last_read)
|
||||||
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?
|
.map_err(|e| (OutgoingKind::Push(user.clone(), pushkey.clone()), e))?
|
||||||
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
|
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
|
||||||
.filter(|(_, pdu)| {
|
.filter(|(_, pdu)| {
|
||||||
matches!(
|
matches!(
|
||||||
|
@ -462,10 +457,12 @@ impl Sending {
|
||||||
};
|
};
|
||||||
|
|
||||||
let permit = db.sending.maximum_requests.acquire().await;
|
let permit = db.sending.maximum_requests.acquire().await;
|
||||||
|
|
||||||
|
error!("sending pdu to {}: {:#?}", userid, pdu);
|
||||||
let _response = pusher::send_push_notice(
|
let _response = pusher::send_push_notice(
|
||||||
&user,
|
&userid,
|
||||||
unread,
|
unread,
|
||||||
&pushers,
|
&pusher,
|
||||||
rules_for_user,
|
rules_for_user,
|
||||||
&pdu,
|
&pdu,
|
||||||
db,
|
db,
|
||||||
|
@ -476,8 +473,7 @@ impl Sending {
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
}
|
}
|
||||||
}
|
Ok(OutgoingKind::Push(user.clone(), pushkey.clone()))
|
||||||
Ok(OutgoingKind::Push(id.clone()))
|
|
||||||
}
|
}
|
||||||
OutgoingKind::Normal(server) => {
|
OutgoingKind::Normal(server) => {
|
||||||
let pdu_jsons = pdu_ids
|
let pdu_jsons = pdu_ids
|
||||||
|
@ -540,30 +536,49 @@ impl Sending {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_servercurrentpdus(key: &IVec) -> Result<(OutgoingKind, IVec)> {
|
fn parse_servercurrentpdus(key: &IVec) -> Result<(OutgoingKind, IVec)> {
|
||||||
let mut parts = key.splitn(2, |&b| b == 0xff);
|
// Appservices start with a plus
|
||||||
|
Ok::<_, Error>(if key.starts_with(b"+") {
|
||||||
|
let mut parts = key[1..].splitn(2, |&b| b == 0xff);
|
||||||
|
|
||||||
let server = parts.next().expect("splitn always returns one element");
|
let server = parts.next().expect("splitn always returns one element");
|
||||||
let pdu = parts
|
let pdu = parts
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
||||||
|
|
||||||
let server = utils::string_from_bytes(&server).map_err(|_| {
|
let server = utils::string_from_bytes(&server).map_err(|_| {
|
||||||
Error::bad_database("Invalid server bytes in server_currenttransaction")
|
Error::bad_database("Invalid server bytes in server_currenttransaction")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Appservices start with a plus
|
|
||||||
Ok::<_, Error>(if server.starts_with('+') {
|
|
||||||
(
|
(
|
||||||
OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| {
|
OutgoingKind::Appservice(Box::<ServerName>::try_from(server).map_err(|_| {
|
||||||
Error::bad_database("Invalid server string in server_currenttransaction")
|
Error::bad_database("Invalid server string in server_currenttransaction")
|
||||||
})?),
|
})?),
|
||||||
IVec::from(pdu),
|
IVec::from(pdu),
|
||||||
)
|
)
|
||||||
} else if server.starts_with('$') {
|
} else if key.starts_with(b"$") {
|
||||||
|
let mut parts = key[1..].splitn(3, |&b| b == 0xff);
|
||||||
|
|
||||||
|
let user = parts.next().expect("splitn always returns one element");
|
||||||
|
let pushkey = parts
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
||||||
|
let pdu = parts
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
||||||
(
|
(
|
||||||
OutgoingKind::Push(server.as_bytes().to_vec()),
|
OutgoingKind::Push(user.to_vec(), pushkey.to_vec()),
|
||||||
IVec::from(pdu),
|
IVec::from(pdu),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
let mut parts = key.splitn(2, |&b| b == 0xff);
|
||||||
|
|
||||||
|
let server = parts.next().expect("splitn always returns one element");
|
||||||
|
let pdu = parts
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
||||||
|
let server = utils::string_from_bytes(&server).map_err(|_| {
|
||||||
|
Error::bad_database("Invalid server bytes in server_currenttransaction")
|
||||||
|
})?;
|
||||||
|
|
||||||
(
|
(
|
||||||
OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| {
|
OutgoingKind::Normal(Box::<ServerName>::try_from(server).map_err(|_| {
|
||||||
Error::bad_database("Invalid server string in server_currenttransaction")
|
Error::bad_database("Invalid server string in server_currenttransaction")
|
||||||
|
|
|
@ -21,9 +21,10 @@ use ruma::{
|
||||||
},
|
},
|
||||||
directory::{IncomingFilter, IncomingRoomNetwork},
|
directory::{IncomingFilter, IncomingRoomNetwork},
|
||||||
events::EventType,
|
events::EventType,
|
||||||
|
identifiers::{KeyId, KeyName},
|
||||||
serde::to_canonical_value,
|
serde::to_canonical_value,
|
||||||
signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap},
|
signatures::{CanonicalJsonObject, CanonicalJsonValue, PublicKeyMap},
|
||||||
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
|
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, SigningKeyAlgorithm, UserId,
|
||||||
};
|
};
|
||||||
use state_res::{Event, EventMap, StateMap};
|
use state_res::{Event, EventMap, StateMap};
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -600,7 +601,7 @@ pub async fn send_transaction_message_route<'a>(
|
||||||
// discard the event whereas the Client Server API's /send/{eventType} endpoint
|
// discard the event whereas the Client Server API's /send/{eventType} endpoint
|
||||||
// would return a M_BAD_JSON error.
|
// would return a M_BAD_JSON error.
|
||||||
'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve {
|
'main_pdu_loop: for (event_id, _room_id, value) in pdus_to_resolve {
|
||||||
debug!("Working on incoming pdu: {:?}", value);
|
info!("Working on incoming pdu: {:?}", value);
|
||||||
let server_name = &body.body.origin;
|
let server_name = &body.body.origin;
|
||||||
let mut pub_key_map = BTreeMap::new();
|
let mut pub_key_map = BTreeMap::new();
|
||||||
|
|
||||||
|
@ -639,7 +640,7 @@ pub async fn send_transaction_message_route<'a>(
|
||||||
|
|
||||||
// 6. persist the event as an outlier.
|
// 6. persist the event as an outlier.
|
||||||
db.rooms.add_pdu_outlier(&pdu)?;
|
db.rooms.add_pdu_outlier(&pdu)?;
|
||||||
debug!("Added pdu as outlier.");
|
info!("Added pdu as outlier.");
|
||||||
|
|
||||||
// Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
|
// Step 9. fetch missing state by calling /state_ids at backwards extremities doing all
|
||||||
// the checks in this list starting at 1. These are not timeline events.
|
// the checks in this list starting at 1. These are not timeline events.
|
||||||
|
@ -914,7 +915,7 @@ pub async fn send_transaction_message_route<'a>(
|
||||||
// We use the `state_at_event` instead of `state_after` so we accurately
|
// We use the `state_at_event` instead of `state_after` so we accurately
|
||||||
// represent the state for this event.
|
// represent the state for this event.
|
||||||
append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?;
|
append_incoming_pdu(&db, &pdu, &extremities, &state_at_event)?;
|
||||||
debug!("Appended incoming pdu.");
|
info!("Appended incoming pdu.");
|
||||||
|
|
||||||
// Set the new room state to the resolved state
|
// Set the new room state to the resolved state
|
||||||
update_resolved_state(
|
update_resolved_state(
|
||||||
|
@ -961,21 +962,31 @@ fn validate_event<'a>(
|
||||||
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
|
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
|
||||||
) -> AsyncRecursiveResult<'a, (Arc<PduEvent>, Option<Arc<PduEvent>>)> {
|
) -> AsyncRecursiveResult<'a, (Arc<PduEvent>, Option<Arc<PduEvent>>)> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
for signature_server in match value
|
for (signature_server, signature) in match value
|
||||||
.get("signatures")
|
.get("signatures")
|
||||||
.ok_or_else(|| "No signatures in server response pdu.".to_string())?
|
.ok_or_else(|| "No signatures in server response pdu.".to_string())?
|
||||||
{
|
{
|
||||||
CanonicalJsonValue::Object(map) => map,
|
CanonicalJsonValue::Object(map) => map,
|
||||||
_ => return Err("Invalid signatures object in server response pdu.".to_string()),
|
_ => return Err("Invalid signatures object in server response pdu.".to_string()),
|
||||||
|
} {
|
||||||
|
let signature_object = match signature {
|
||||||
|
CanonicalJsonValue::Object(map) => map,
|
||||||
|
_ => {
|
||||||
|
return Err(
|
||||||
|
"Invalid signatures content object in server response pdu.".to_string()
|
||||||
|
)
|
||||||
}
|
}
|
||||||
.keys()
|
};
|
||||||
{
|
|
||||||
|
let signature_ids = signature_object.keys().collect::<Vec<_>>();
|
||||||
|
|
||||||
debug!("Fetching signing keys for {}", signature_server);
|
debug!("Fetching signing keys for {}", signature_server);
|
||||||
let keys = match fetch_signing_keys(
|
let keys = match fetch_signing_keys(
|
||||||
&db,
|
&db,
|
||||||
&Box::<ServerName>::try_from(&**signature_server).map_err(|_| {
|
&Box::<ServerName>::try_from(&**signature_server).map_err(|_| {
|
||||||
"Invalid servername in signatures of server response pdu.".to_string()
|
"Invalid servername in signatures of server response pdu.".to_string()
|
||||||
})?,
|
})?,
|
||||||
|
signature_ids,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
@ -987,11 +998,14 @@ fn validate_event<'a>(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
pub_key_map.insert(signature_server.clone(), keys);
|
pub_key_map.insert(dbg!(signature_server.clone()), dbg!(keys));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut val =
|
let mut val = match ruma::signatures::verify_event(
|
||||||
match ruma::signatures::verify_event(pub_key_map, &value, &RoomVersionId::Version5) {
|
dbg!(&pub_key_map),
|
||||||
|
&value,
|
||||||
|
&RoomVersionId::Version5,
|
||||||
|
) {
|
||||||
Ok(ver) => {
|
Ok(ver) => {
|
||||||
if let ruma::signatures::Verified::Signatures = ver {
|
if let ruma::signatures::Verified::Signatures = ver {
|
||||||
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
|
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
|
||||||
|
@ -1116,7 +1130,7 @@ pub(crate) async fn fetch_events(
|
||||||
Arc::new(pdu)
|
Arc::new(pdu)
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
debug!("Fetching event over federation");
|
debug!("Fetching event over federation: {:?}", id);
|
||||||
match db
|
match db
|
||||||
.sending
|
.sending
|
||||||
.send_federation_request(
|
.send_federation_request(
|
||||||
|
@ -1159,38 +1173,53 @@ pub(crate) async fn fetch_events(
|
||||||
pub(crate) async fn fetch_signing_keys(
|
pub(crate) async fn fetch_signing_keys(
|
||||||
db: &Database,
|
db: &Database,
|
||||||
origin: &ServerName,
|
origin: &ServerName,
|
||||||
|
signature_ids: Vec<&String>,
|
||||||
) -> Result<BTreeMap<String, String>> {
|
) -> Result<BTreeMap<String, String>> {
|
||||||
let mut result = BTreeMap::new();
|
let contains_all_ids = |keys: &BTreeMap<String, String>| {
|
||||||
|
signature_ids
|
||||||
|
.iter()
|
||||||
|
.all(|&id| dbg!(dbg!(&keys).contains_key(dbg!(id))))
|
||||||
|
};
|
||||||
|
|
||||||
match db.globals.signing_keys_for(origin)? {
|
let mut result = db
|
||||||
keys if !keys.is_empty() => Ok(keys
|
.globals
|
||||||
|
.signing_keys_for(origin)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (k.to_string(), v.key))
|
.map(|(k, v)| (k.to_string(), v.key))
|
||||||
.collect()),
|
.collect::<BTreeMap<_, _>>();
|
||||||
_ => {
|
|
||||||
match db
|
if contains_all_ids(&result) {
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(get_keys_response) = db
|
||||||
.sending
|
.sending
|
||||||
.send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
|
.send_federation_request(&db.globals, origin, get_server_keys::v2::Request::new())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(keys) => {
|
db.globals
|
||||||
db.globals.add_signing_key(origin, &keys.server_key)?;
|
.add_signing_key(origin, &get_keys_response.server_key)?;
|
||||||
|
|
||||||
result.extend(
|
result.extend(
|
||||||
keys.server_key
|
get_keys_response
|
||||||
|
.server_key
|
||||||
.verify_keys
|
.verify_keys
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (k.to_string(), v.key)),
|
.map(|(k, v)| (k.to_string(), v.key)),
|
||||||
);
|
);
|
||||||
result.extend(
|
result.extend(
|
||||||
keys.server_key
|
get_keys_response
|
||||||
|
.server_key
|
||||||
.old_verify_keys
|
.old_verify_keys
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, v)| (k.to_string(), v.key)),
|
.map(|(k, v)| (k.to_string(), v.key)),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if contains_all_ids(&result) {
|
||||||
return Ok(result);
|
return Ok(result);
|
||||||
}
|
}
|
||||||
_ => {
|
}
|
||||||
|
|
||||||
for server in db.globals.trusted_servers() {
|
for server in db.globals.trusted_servers() {
|
||||||
debug!("Asking {} for {}'s signing key", server, origin);
|
debug!("Asking {} for {}'s signing key", server, origin);
|
||||||
if let Ok(keys) = db
|
if let Ok(keys) = db
|
||||||
|
@ -1221,16 +1250,16 @@ pub(crate) async fn fetch_signing_keys(
|
||||||
.map(|(k, v)| (k.to_string(), v.key)),
|
.map(|(k, v)| (k.to_string(), v.key)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if contains_all_ids(&result) {
|
||||||
return Ok(result);
|
return Ok(result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Err(Error::BadServerResponse(
|
Err(Error::BadServerResponse(
|
||||||
"Failed to find public key for server",
|
"Failed to find public key for server",
|
||||||
))
|
))
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gather all state snapshots needed to resolve the current state of the room.
|
/// Gather all state snapshots needed to resolve the current state of the room.
|
||||||
|
@ -1244,7 +1273,7 @@ pub(crate) async fn calculate_forward_extremities(
|
||||||
db: &Database,
|
db: &Database,
|
||||||
pdu: &PduEvent,
|
pdu: &PduEvent,
|
||||||
) -> Result<Vec<EventId>> {
|
) -> Result<Vec<EventId>> {
|
||||||
let mut current_leaves = db.rooms.get_pdu_leaves(pdu.room_id())?;
|
let mut current_leaves = dbg!(db.rooms.get_pdu_leaves(pdu.room_id())?);
|
||||||
|
|
||||||
let mut is_incoming_leaf = true;
|
let mut is_incoming_leaf = true;
|
||||||
// Make sure the incoming event is not already a forward extremity
|
// Make sure the incoming event is not already a forward extremity
|
||||||
|
@ -1290,7 +1319,6 @@ pub(crate) async fn calculate_forward_extremities(
|
||||||
///
|
///
|
||||||
/// This guarantees that the incoming event will be in the state sets (at least our servers
|
/// This guarantees that the incoming event will be in the state sets (at least our servers
|
||||||
/// and the sending server).
|
/// and the sending server).
|
||||||
#[tracing::instrument(skip(db))]
|
|
||||||
pub(crate) async fn build_forward_extremity_snapshots(
|
pub(crate) async fn build_forward_extremity_snapshots(
|
||||||
db: &Database,
|
db: &Database,
|
||||||
pdu: Arc<PduEvent>,
|
pdu: Arc<PduEvent>,
|
||||||
|
@ -1316,7 +1344,7 @@ pub(crate) async fn build_forward_extremity_snapshots(
|
||||||
Some(leave_pdu) => {
|
Some(leave_pdu) => {
|
||||||
let pdu_shortstatehash = db
|
let pdu_shortstatehash = db
|
||||||
.rooms
|
.rooms
|
||||||
.pdu_shortstatehash(&leave_pdu.event_id)?
|
.pdu_shortstatehash(dbg!(&leave_pdu.event_id))?
|
||||||
.ok_or_else(|| Error::bad_database("Found pdu with no statehash in db."))?;
|
.ok_or_else(|| Error::bad_database("Found pdu with no statehash in db."))?;
|
||||||
|
|
||||||
if current_shortstatehash.as_ref() == Some(&pdu_shortstatehash) {
|
if current_shortstatehash.as_ref() == Some(&pdu_shortstatehash) {
|
||||||
|
@ -1367,7 +1395,9 @@ pub(crate) fn update_resolved_state(
|
||||||
new_state.insert(
|
new_state.insert(
|
||||||
(
|
(
|
||||||
ev_type,
|
ev_type,
|
||||||
state_k.ok_or_else(|| Error::Conflict("State contained non state event"))?,
|
state_k.ok_or_else(|| {
|
||||||
|
Error::Conflict("update_resolved_state: State contained non state event")
|
||||||
|
})?,
|
||||||
),
|
),
|
||||||
pdu.event_id.clone(),
|
pdu.event_id.clone(),
|
||||||
);
|
);
|
||||||
|
@ -1395,9 +1425,9 @@ pub(crate) fn append_incoming_pdu(
|
||||||
new_state.insert(
|
new_state.insert(
|
||||||
(
|
(
|
||||||
ev_type.clone(),
|
ev_type.clone(),
|
||||||
state_k
|
state_k.clone().ok_or_else(|| {
|
||||||
.clone()
|
Error::Conflict("append_incoming_pdu: State contained non state event")
|
||||||
.ok_or_else(|| Error::Conflict("State contained non state event"))?,
|
})?,
|
||||||
),
|
),
|
||||||
state_pdu.event_id.clone(),
|
state_pdu.event_id.clone(),
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue