handle migrate and tombstone events

main
Charlotte Som 2024-11-26 17:20:51 +02:00
parent 88ac43b18c
commit 6c22e2b4dd
3 changed files with 88 additions and 17 deletions

View File

@ -159,6 +159,18 @@ impl DataServerSubscription {
} }
self.last_seq = Some(event.seq); self.last_seq = Some(event.seq);
if let Some(handle) = event.handle.as_ref() {
let user = fetch_user(&self.server, &event.did).await?;
if user.handle.as_deref() != Some(handle.as_str()) {
tracing::warn!(
seq = %event.seq,
expected = ?handle.as_str(),
got = ?user.handle,
"identity update did not update handle to asserted value"
);
}
}
Ok(Some(StreamEventPayload::Identity(event))) Ok(Some(StreamEventPayload::Identity(event)))
} }
@ -193,9 +205,59 @@ impl DataServerSubscription {
event.active = false; event.active = false;
} }
// TODO: mark user status ?
Ok(Some(StreamEventPayload::Account(event))) Ok(Some(StreamEventPayload::Account(event)))
} }
async fn handle_migrate(
&mut self,
event: subscribe_repos::Migrate,
) -> Result<Option<StreamEventPayload>> {
let last_seq = self.last_seq.unwrap_or_default();
if event.seq < last_seq {
bail!(
"got event out of order from stream (seq = {}, prev = {})",
event.seq,
last_seq
)
}
self.last_seq = Some(event.seq);
// let _user = fetch_user(&self.server, &event.did).await?;
Ok(Some(StreamEventPayload::Migrate(event)))
}
async fn handle_tombstone(
&mut self,
event: subscribe_repos::Tombstone,
) -> Result<Option<StreamEventPayload>> {
let last_seq = self.last_seq.unwrap_or_default();
if event.seq < last_seq {
bail!(
"got event out of order from stream (seq = {}, prev = {})",
event.seq,
last_seq
)
}
self.last_seq = Some(event.seq);
let user = lookup_user(&self.server, &event.did).await?;
let pds = user.pds.as_deref().unwrap_or_default();
if pds != self.host {
bail!(
"unauthoritative tombstone event from {} for {}",
&self.host,
event.did.as_str()
);
}
// TODO: mark user status as deleted ?
Ok(Some(StreamEventPayload::Tombstone(event)))
}
async fn handle_event(&mut self, frame: Bytes) -> Result<()> { async fn handle_event(&mut self, frame: Bytes) -> Result<()> {
let buf: &[u8] = &frame; let buf: &[u8] = &frame;
let mut cursor = Cursor::new(buf); let mut cursor = Cursor::new(buf);
@ -227,7 +289,16 @@ impl DataServerSubscription {
serde_ipld_dagcbor::from_slice::<subscribe_repos::Account>(payload_buf)?; serde_ipld_dagcbor::from_slice::<subscribe_repos::Account>(payload_buf)?;
self.handle_account(payload).await? self.handle_account(payload).await?
} }
// TODO: migrate, tombstone Some("#migrate") => {
let payload =
serde_ipld_dagcbor::from_slice::<subscribe_repos::Migrate>(payload_buf)?;
self.handle_migrate(payload).await?
}
Some("#tombstone") => {
let payload =
serde_ipld_dagcbor::from_slice::<subscribe_repos::Tombstone>(payload_buf)?;
self.handle_tombstone(payload).await?
}
Some("#info") => { Some("#info") => {
let payload = serde_ipld_dagcbor::from_slice::<subscribe_repos::Info>(payload_buf)?; let payload = serde_ipld_dagcbor::from_slice::<subscribe_repos::Info>(payload_buf)?;
if payload.name == "OutdatedCursor" { if payload.name == "OutdatedCursor" {

View File

@ -48,23 +48,21 @@ async fn run_sequencer(
let mut cursor = Cursor::new(Vec::with_capacity(1024 * 1024)); let mut cursor = Cursor::new(Vec::with_capacity(1024 * 1024));
serde_ipld_dagcbor::to_writer(&mut cursor, &header)?; serde_ipld_dagcbor::to_writer(&mut cursor, &header)?;
macro_rules! handle_payload {
($payload:ident) => {{
$payload.seq = curr_seq as i64;
serde_ipld_dagcbor::to_writer(&mut cursor, &$payload)?;
}};
}
match payload { match payload {
StreamEventPayload::Commit(mut payload) => { StreamEventPayload::Commit(mut payload) => handle_payload!(payload),
payload.seq = curr_seq as i64; StreamEventPayload::Handle(mut payload) => handle_payload!(payload),
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; StreamEventPayload::Identity(mut payload) => handle_payload!(payload),
} StreamEventPayload::Account(mut payload) => handle_payload!(payload),
StreamEventPayload::Handle(mut payload) => { StreamEventPayload::Migrate(mut payload) => handle_payload!(payload),
payload.seq = curr_seq as i64; StreamEventPayload::Tombstone(mut payload) => handle_payload!(payload),
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
}
StreamEventPayload::Identity(mut payload) => {
payload.seq = curr_seq as i64;
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
}
StreamEventPayload::Account(mut payload) => {
payload.seq = curr_seq as i64;
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
}
StreamEventPayload::Unknown(payload) => { StreamEventPayload::Unknown(payload) => {
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
} }

View File

@ -16,6 +16,8 @@ pub enum StreamEventPayload {
Handle(subscribe_repos::Handle), Handle(subscribe_repos::Handle),
Identity(subscribe_repos::Identity), Identity(subscribe_repos::Identity),
Account(subscribe_repos::Account), Account(subscribe_repos::Account),
Migrate(subscribe_repos::Migrate),
Tombstone(subscribe_repos::Tombstone),
Unknown(BTreeMap<String, Ipld>), Unknown(BTreeMap<String, Ipld>),
} }