diff --git a/src/indexer.rs b/src/indexer.rs index 1612408..9083848 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -159,6 +159,18 @@ impl DataServerSubscription { } self.last_seq = Some(event.seq); + if let Some(handle) = event.handle.as_ref() { + let user = fetch_user(&self.server, &event.did).await?; + if user.handle.as_deref() != Some(handle.as_str()) { + tracing::warn!( + seq = %event.seq, + expected = ?handle.as_str(), + got = ?user.handle, + "identity update did not update handle to asserted value" + ); + } + } + Ok(Some(StreamEventPayload::Identity(event))) } @@ -193,9 +205,59 @@ impl DataServerSubscription { event.active = false; } + // TODO: mark user status ? + Ok(Some(StreamEventPayload::Account(event))) } + async fn handle_migrate( + &mut self, + event: subscribe_repos::Migrate, + ) -> Result> { + let last_seq = self.last_seq.unwrap_or_default(); + if event.seq < last_seq { + bail!( + "got event out of order from stream (seq = {}, prev = {})", + event.seq, + last_seq + ) + } + self.last_seq = Some(event.seq); + + // let _user = fetch_user(&self.server, &event.did).await?; + + Ok(Some(StreamEventPayload::Migrate(event))) + } + + async fn handle_tombstone( + &mut self, + event: subscribe_repos::Tombstone, + ) -> Result> { + let last_seq = self.last_seq.unwrap_or_default(); + if event.seq < last_seq { + bail!( + "got event out of order from stream (seq = {}, prev = {})", + event.seq, + last_seq + ) + } + self.last_seq = Some(event.seq); + + let user = lookup_user(&self.server, &event.did).await?; + let pds = user.pds.as_deref().unwrap_or_default(); + if pds != self.host { + bail!( + "unauthoritative tombstone event from {} for {}", + &self.host, + event.did.as_str() + ); + } + + // TODO: mark user status as deleted ? + + Ok(Some(StreamEventPayload::Tombstone(event))) + } + async fn handle_event(&mut self, frame: Bytes) -> Result<()> { let buf: &[u8] = &frame; let mut cursor = Cursor::new(buf); @@ -227,7 +289,16 @@ impl DataServerSubscription { serde_ipld_dagcbor::from_slice::(payload_buf)?; self.handle_account(payload).await? } - // TODO: migrate, tombstone + Some("#migrate") => { + let payload = + serde_ipld_dagcbor::from_slice::(payload_buf)?; + self.handle_migrate(payload).await? + } + Some("#tombstone") => { + let payload = + serde_ipld_dagcbor::from_slice::(payload_buf)?; + self.handle_tombstone(payload).await? + } Some("#info") => { let payload = serde_ipld_dagcbor::from_slice::(payload_buf)?; if payload.name == "OutdatedCursor" { diff --git a/src/sequencer.rs b/src/sequencer.rs index 609a686..cdf3d7b 100644 --- a/src/sequencer.rs +++ b/src/sequencer.rs @@ -48,23 +48,21 @@ async fn run_sequencer( let mut cursor = Cursor::new(Vec::with_capacity(1024 * 1024)); serde_ipld_dagcbor::to_writer(&mut cursor, &header)?; + macro_rules! handle_payload { + ($payload:ident) => {{ + $payload.seq = curr_seq as i64; + serde_ipld_dagcbor::to_writer(&mut cursor, &$payload)?; + }}; + } + match payload { - StreamEventPayload::Commit(mut payload) => { - payload.seq = curr_seq as i64; - serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; - } - StreamEventPayload::Handle(mut payload) => { - payload.seq = curr_seq as i64; - serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; - } - StreamEventPayload::Identity(mut payload) => { - payload.seq = curr_seq as i64; - serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; - } - StreamEventPayload::Account(mut payload) => { - payload.seq = curr_seq as i64; - serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; - } + StreamEventPayload::Commit(mut payload) => handle_payload!(payload), + StreamEventPayload::Handle(mut payload) => handle_payload!(payload), + StreamEventPayload::Identity(mut payload) => handle_payload!(payload), + StreamEventPayload::Account(mut payload) => handle_payload!(payload), + StreamEventPayload::Migrate(mut payload) => handle_payload!(payload), + StreamEventPayload::Tombstone(mut payload) => handle_payload!(payload), + StreamEventPayload::Unknown(payload) => { serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; } diff --git a/src/wire_proto.rs b/src/wire_proto.rs index 899a942..973a30b 100644 --- a/src/wire_proto.rs +++ b/src/wire_proto.rs @@ -16,6 +16,8 @@ pub enum StreamEventPayload { Handle(subscribe_repos::Handle), Identity(subscribe_repos::Identity), Account(subscribe_repos::Account), + Migrate(subscribe_repos::Migrate), + Tombstone(subscribe_repos::Tombstone), Unknown(BTreeMap), }