diff --git a/src/indexer.rs b/src/indexer.rs
index 6c21628..1612408 100644
--- a/src/indexer.rs
+++ b/src/indexer.rs
@@ -4,7 +4,7 @@ use anyhow::{bail, Result};
use atrium_api::com::atproto::sync::subscribe_repos;
use bytes::Bytes;
use fastwebsockets::{FragmentCollector, OpCode, Payload, WebSocketError};
-use hyper::{header, upgrade::Upgraded, Request, Uri};
+use hyper::{header, upgrade::Upgraded, Request};
use hyper_util::rt::{TokioExecutor, TokioIo};
use ipld_core::ipld::Ipld;
use serde_ipld_dagcbor::DecodeError;
@@ -14,7 +14,7 @@ use tokio_rustls::rustls::pki_types::ServerName;
use crate::{
http::body_empty,
tls::open_tls_stream,
- user::lookup_user,
+ user::{fetch_user, lookup_user},
wire_proto::{StreamEvent, StreamEventHeader, StreamEventPayload},
RelayServer,
};
@@ -42,9 +42,8 @@ async fn create_ws_client(
.header("Sec-WebSocket-Version", "13")
.body(body_empty())?;
- let (mut ws, _) = fastwebsockets::handshake::client(&TokioExecutor::new(), req, tls_stream)
- .await
- .unwrap();
+ let (mut ws, _) =
+ fastwebsockets::handshake::client(&TokioExecutor::new(), req, tls_stream).await?;
ws.set_auto_pong(true);
ws.set_auto_close(true);
@@ -70,33 +69,134 @@ impl DataServerSubscription {
async fn handle_commit(
&mut self,
- payload: subscribe_repos::Commit,
+ event: subscribe_repos::Commit,
) -> Result