rebroadcast handle, identity, account events

main
Charlotte Som 2024-11-25 23:30:13 +02:00
parent 642abf6aa5
commit 88ac43b18c
4 changed files with 169 additions and 40 deletions

View File

@ -4,7 +4,7 @@ use anyhow::{bail, Result};
use atrium_api::com::atproto::sync::subscribe_repos; use atrium_api::com::atproto::sync::subscribe_repos;
use bytes::Bytes; use bytes::Bytes;
use fastwebsockets::{FragmentCollector, OpCode, Payload, WebSocketError}; use fastwebsockets::{FragmentCollector, OpCode, Payload, WebSocketError};
use hyper::{header, upgrade::Upgraded, Request, Uri}; use hyper::{header, upgrade::Upgraded, Request};
use hyper_util::rt::{TokioExecutor, TokioIo}; use hyper_util::rt::{TokioExecutor, TokioIo};
use ipld_core::ipld::Ipld; use ipld_core::ipld::Ipld;
use serde_ipld_dagcbor::DecodeError; use serde_ipld_dagcbor::DecodeError;
@ -14,7 +14,7 @@ use tokio_rustls::rustls::pki_types::ServerName;
use crate::{ use crate::{
http::body_empty, http::body_empty,
tls::open_tls_stream, tls::open_tls_stream,
user::lookup_user, user::{fetch_user, lookup_user},
wire_proto::{StreamEvent, StreamEventHeader, StreamEventPayload}, wire_proto::{StreamEvent, StreamEventHeader, StreamEventPayload},
RelayServer, RelayServer,
}; };
@ -42,9 +42,8 @@ async fn create_ws_client(
.header("Sec-WebSocket-Version", "13") .header("Sec-WebSocket-Version", "13")
.body(body_empty())?; .body(body_empty())?;
let (mut ws, _) = fastwebsockets::handshake::client(&TokioExecutor::new(), req, tls_stream) let (mut ws, _) =
.await fastwebsockets::handshake::client(&TokioExecutor::new(), req, tls_stream).await?;
.unwrap();
ws.set_auto_pong(true); ws.set_auto_pong(true);
ws.set_auto_close(true); ws.set_auto_close(true);
@ -70,33 +69,134 @@ impl DataServerSubscription {
async fn handle_commit( async fn handle_commit(
&mut self, &mut self,
payload: subscribe_repos::Commit, event: subscribe_repos::Commit,
) -> Result<Option<StreamEventPayload>> { ) -> Result<Option<StreamEventPayload>> {
let user = lookup_user(&self.server, &payload.repo).await?; let last_seq = self.last_seq.unwrap_or_default();
let Some(pds) = user.pds else { if event.seq < last_seq {
bail!("user has no associated pds? {:?}", user);
};
let uri: Uri = pds.parse()?;
if uri.authority().map(|a| a.host()) != Some(&self.host) {
bail!( bail!(
"commit from non-authoritative pds (got {} expected {})", "got event out of order from stream (seq = {}, prev = {})",
event.seq,
last_seq
)
}
self.last_seq = Some(event.seq);
let mut user = lookup_user(&self.server, &event.repo).await?;
let pds = user.pds.as_deref().unwrap_or_default();
if pds != self.host {
tracing::warn!(
"received event from different pds than expected (got {} expected {})",
self.host, self.host,
pds pds
); );
// re-fetch user (without cache)
user = fetch_user(&self.server, &event.repo).await?;
let fresh_pds = user.pds.as_deref().unwrap_or_default();
if fresh_pds != self.host {
bail!(
"commit from non-authoritative pds (got {} expected {})",
self.host,
fresh_pds
);
}
} }
if user.takedown { // TODO: lookup did in takedown db tree
tracing::debug!(did = %user.did, seq = %payload.seq, "dropping commit event from taken-down user"); let takedown = false;
if takedown {
tracing::debug!(did = %user.did, seq = %event.seq, "dropping commit event from taken-down user");
return Ok(None); return Ok(None);
} }
self.last_seq = Some(payload.seq); if event.rebase {
Ok(Some(StreamEventPayload::Commit(payload))) tracing::debug!(did = %user.did, seq = %event.seq, "dropping commit event with rebase flag");
return Ok(None);
}
Ok(Some(StreamEventPayload::Commit(event)))
}
async fn handle_handle(
&mut self,
event: subscribe_repos::Handle,
) -> Result<Option<StreamEventPayload>> {
let last_seq = self.last_seq.unwrap_or_default();
if event.seq < last_seq {
bail!(
"got event out of order from stream (seq = {}, prev = {})",
event.seq,
last_seq
)
}
self.last_seq = Some(event.seq);
let user = fetch_user(&self.server, &event.did).await?;
if user.handle.as_deref() != Some(event.handle.as_str()) {
tracing::warn!(
seq = %event.seq,
expected = ?event.handle.as_str(),
got = ?user.handle,
"handle update did not update handle to asserted value"
);
}
Ok(Some(StreamEventPayload::Handle(event)))
}
async fn handle_identity(
&mut self,
event: subscribe_repos::Identity,
) -> Result<Option<StreamEventPayload>> {
let last_seq = self.last_seq.unwrap_or_default();
if event.seq < last_seq {
bail!(
"got event out of order from stream (seq = {}, prev = {})",
event.seq,
last_seq
)
}
self.last_seq = Some(event.seq);
Ok(Some(StreamEventPayload::Identity(event)))
}
async fn handle_account(
&mut self,
mut event: subscribe_repos::Account,
) -> Result<Option<StreamEventPayload>> {
let last_seq = self.last_seq.unwrap_or_default();
if event.seq < last_seq {
bail!(
"got event out of order from stream (seq = {}, prev = {})",
event.seq,
last_seq
)
}
self.last_seq = Some(event.seq);
let user = fetch_user(&self.server, &event.did).await?;
let pds = user.pds.as_deref().unwrap_or_default();
if pds != self.host {
bail!(
"account event from non-authoritative pds (got {} expected {})",
pds,
&self.host
)
}
// TODO: handle takedowns
let takedown = false;
if takedown {
event.status = Some("takendown".into());
event.active = false;
}
Ok(Some(StreamEventPayload::Account(event)))
} }
async fn handle_event(&mut self, frame: Bytes) -> Result<()> { async fn handle_event(&mut self, frame: Bytes) -> Result<()> {
// TODO: validate if this message is valid to come from this host
let buf: &[u8] = &frame; let buf: &[u8] = &frame;
let mut cursor = Cursor::new(buf); let mut cursor = Cursor::new(buf);
let (header_buf, payload_buf) = let (header_buf, payload_buf) =
@ -112,12 +212,22 @@ impl DataServerSubscription {
serde_ipld_dagcbor::from_slice::<subscribe_repos::Commit>(payload_buf)?; serde_ipld_dagcbor::from_slice::<subscribe_repos::Commit>(payload_buf)?;
self.handle_commit(payload).await? self.handle_commit(payload).await?
} }
Some("#handle") => { Some("#handle") => {
// TODO let payload =
None serde_ipld_dagcbor::from_slice::<subscribe_repos::Handle>(payload_buf)?;
self.handle_handle(payload).await?
} }
Some("#identity") => {
let payload =
serde_ipld_dagcbor::from_slice::<subscribe_repos::Identity>(payload_buf)?;
self.handle_identity(payload).await?
}
Some("#account") => {
let payload =
serde_ipld_dagcbor::from_slice::<subscribe_repos::Account>(payload_buf)?;
self.handle_account(payload).await?
}
// TODO: migrate, tombstone
Some("#info") => { Some("#info") => {
let payload = serde_ipld_dagcbor::from_slice::<subscribe_repos::Info>(payload_buf)?; let payload = serde_ipld_dagcbor::from_slice::<subscribe_repos::Info>(payload_buf)?;
if payload.name == "OutdatedCursor" { if payload.name == "OutdatedCursor" {
@ -126,7 +236,6 @@ impl DataServerSubscription {
None None
} }
Some(t) => { Some(t) => {
tracing::warn!("dropped unknown message type '{}'", t); tracing::warn!("dropped unknown message type '{}'", t);
None None

View File

@ -53,6 +53,18 @@ async fn run_sequencer(
payload.seq = curr_seq as i64; payload.seq = curr_seq as i64;
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
} }
StreamEventPayload::Handle(mut payload) => {
payload.seq = curr_seq as i64;
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
}
StreamEventPayload::Identity(mut payload) => {
payload.seq = curr_seq as i64;
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
}
StreamEventPayload::Account(mut payload) => {
payload.seq = curr_seq as i64;
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
}
StreamEventPayload::Unknown(payload) => { StreamEventPayload::Unknown(payload) => {
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
} }

View File

@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
use atrium_api::did_doc::DidDocument; use atrium_api::did_doc::DidDocument;
use bytes::Buf; use bytes::Buf;
use http_body_util::BodyExt; use http_body_util::BodyExt;
use hyper::{client::conn::http1, Request, StatusCode}; use hyper::{client::conn::http1, Request, StatusCode, Uri};
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use rustls::pki_types::ServerName; use rustls::pki_types::ServerName;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -19,9 +19,7 @@ pub struct User {
pub did: String, pub did: String,
pub pds: Option<String>, pub pds: Option<String>,
#[serde(default)] #[serde(default)]
pub takedown: bool, pub handle: Option<String>,
#[serde(default)]
pub tombstone: bool,
} }
pub async fn fetch_user(server: &RelayServer, did: &str) -> Result<User> { pub async fn fetch_user(server: &RelayServer, did: &str) -> Result<User> {
@ -48,8 +46,6 @@ pub async fn fetch_user(server: &RelayServer, did: &str) -> Result<User> {
} }
}); });
tracing::debug!("handshake");
let res = sender let res = sender
.send_request(req) .send_request(req)
.await .await
@ -58,20 +54,29 @@ pub async fn fetch_user(server: &RelayServer, did: &str) -> Result<User> {
bail!("plc directory returned non-200 status"); bail!("plc directory returned non-200 status");
} }
tracing::debug!("got response");
let body = res.collect().await?.aggregate(); let body = res.collect().await?.aggregate();
let did_doc = serde_json::from_reader::<_, DidDocument>(body.reader()) let did_doc = serde_json::from_reader::<_, DidDocument>(body.reader())
.context("Failed to parse plc DID doc as JSON")?; .context("Failed to parse plc DID doc as JSON")?;
let user = User { let pds_endpoint = did_doc.get_pds_endpoint();
pds: did_doc.get_pds_endpoint(), let pds_uri: Option<Uri> = pds_endpoint.as_deref().unwrap_or_default().parse().ok();
did: did_doc.id, let pds = pds_uri
takedown: false, .as_ref()
tombstone: false, .and_then(|u| u.authority())
}; .map(|a| a.host())
.map(|s| s.to_string());
store_user(server, &user).await?; let handle = did_doc
.also_known_as
.and_then(|v| v.into_iter().next())
.and_then(|s| s.strip_prefix("at://").map(str::to_string));
let did = did_doc.id;
// TODO: check if handle resolves to did and fill none otherwise
let user = User { pds, did, handle };
store_user(server, &user)?;
Ok(user) Ok(user)
} else if did.starts_with("did:web:") { } else if did.starts_with("did:web:") {
@ -90,7 +95,7 @@ pub async fn lookup_user(server: &RelayServer, did: &str) -> Result<User> {
return fetch_user(server, did).await; return fetch_user(server, did).await;
} }
pub async fn store_user(server: &RelayServer, user: &User) -> Result<()> { pub fn store_user(server: &RelayServer, user: &User) -> Result<()> {
let data = serde_ipld_dagcbor::to_vec(&user)?; let data = serde_ipld_dagcbor::to_vec(&user)?;
server.db_users.insert(&user.did, data)?; server.db_users.insert(&user.did, data)?;
Ok(()) Ok(())

View File

@ -13,6 +13,9 @@ pub struct StreamEventHeader {
pub enum StreamEventPayload { pub enum StreamEventPayload {
Commit(subscribe_repos::Commit), Commit(subscribe_repos::Commit),
Handle(subscribe_repos::Handle),
Identity(subscribe_repos::Identity),
Account(subscribe_repos::Account),
Unknown(BTreeMap<String, Ipld>), Unknown(BTreeMap<String, Ipld>),
} }