add indexer and sequencer, rebroadcast events with real seq numbers

relaying proof of concept done
This commit is contained in:
Charlotte Som 2024-11-25 06:00:50 +02:00
parent d883c9b10b
commit 0a9b998469
11 changed files with 969 additions and 34 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target
/data

651
Cargo.lock generated
View file

@ -44,6 +44,32 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "aws-lc-rs"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f47bb8cc16b669d267eeccf585aea077d0882f4777b1c1f740217885d6e6e5a3"
dependencies = [
"aws-lc-sys",
"paste",
"zeroize",
]
[[package]]
name = "aws-lc-sys"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2101df3813227bbaaaa0b04cd61c534c7954b22bd68d399b440be937dc63ff7"
dependencies = [
"bindgen",
"cc",
"cmake",
"dunce",
"fs_extra",
"libc",
"paste",
]
[[package]]
name = "backtrace"
version = "0.3.74"
@ -59,12 +85,47 @@ dependencies = [
"windows-targets",
]
[[package]]
name = "base-x"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270"
[[package]]
name = "base64"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "bindgen"
version = "0.69.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088"
dependencies = [
"bitflags 2.6.0",
"cexpr",
"clang-sys",
"itertools",
"lazy_static",
"lazycell",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.89",
"which",
]
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.6.0"
@ -92,6 +153,26 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
[[package]]
name = "cbor4ii"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4"
dependencies = [
"serde",
]
[[package]]
name = "cc"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47"
dependencies = [
"jobserver",
"libc",
"shlex",
]
[[package]]
name = "cerulea_relay"
version = "0.1.0"
@ -101,13 +182,29 @@ dependencies = [
"http-body-util",
"hyper",
"hyper-util",
"ipld-core",
"pin-project-lite",
"qstring",
"rustls",
"serde",
"serde_ipld_dagcbor",
"sled",
"tap",
"tokio",
"tokio-rustls",
"tracing",
"tracing-subscriber",
"uuid",
"webpki-roots",
]
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
@ -116,6 +213,49 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cid"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a"
dependencies = [
"core2",
"multibase",
"multihash",
"serde",
"serde_bytes",
"unsigned-varint",
]
[[package]]
name = "clang-sys"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "cmake"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a"
dependencies = [
"cc",
]
[[package]]
name = "core2"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505"
dependencies = [
"memchr",
]
[[package]]
name = "cpufeatures"
version = "0.2.16"
@ -125,6 +265,30 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -135,6 +299,32 @@ dependencies = [
"typenum",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
[[package]]
name = "data-encoding-macro"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1559b6cba622276d6d63706db152618eeb15b89b3e4041446b05876e352e639"
dependencies = [
"data-encoding",
"data-encoding-macro-internal",
]
[[package]]
name = "data-encoding-macro-internal"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332d754c0af53bc87c108fed664d121ecf59207ec4196041f04d6ab9002ad33f"
dependencies = [
"data-encoding",
"syn 1.0.109",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -145,12 +335,34 @@ dependencies = [
"crypto-common",
]
[[package]]
name = "dunce"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
[[package]]
name = "either"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "fastwebsockets"
version = "0.8.0"
@ -177,6 +389,22 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "futures-channel"
version = "0.3.31"
@ -216,6 +444,15 @@ dependencies = [
"pin-utils",
]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]]
name = "generic-array"
version = "0.14.7"
@ -243,6 +480,12 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "h2"
version = "0.4.7"
@ -274,6 +517,15 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "home"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
dependencies = [
"windows-sys",
]
[[package]]
name = "http"
version = "1.1.0"
@ -369,24 +621,84 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
dependencies = [
"cfg-if",
]
[[package]]
name = "ipld-core"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ede82a79e134f179f4b29b5fdb1eb92bd1b38c4dfea394c539051150a21b9b"
dependencies = [
"cid",
"serde",
"serde_bytes",
]
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2"
[[package]]
name = "jobserver"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
dependencies = [
"libc",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.164"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f"
[[package]]
name = "libloading"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
[[package]]
name = "lock_api"
version = "0.4.12"
@ -418,6 +730,12 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.8.0"
@ -439,6 +757,38 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "multibase"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404"
dependencies = [
"base-x",
"data-encoding",
"data-encoding-macro",
]
[[package]]
name = "multihash"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc41f430805af9d1cf4adae4ed2149c759b877b01d909a1f40256188d09345d2"
dependencies = [
"core2",
"serde",
"unsigned-varint",
]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -470,6 +820,17 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
@ -477,7 +838,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
"parking_lot_core 0.9.10",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
]
[[package]]
@ -488,11 +863,17 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"redox_syscall 0.5.7",
"smallvec",
"windows-targets",
]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "percent-encoding"
version = "2.3.1"
@ -516,7 +897,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.89",
]
[[package]]
@ -540,6 +921,16 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "prettyplease"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033"
dependencies = [
"proc-macro2",
"syn 2.0.89",
]
[[package]]
name = "proc-macro2"
version = "1.0.92"
@ -597,13 +988,22 @@ dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
dependencies = [
"bitflags",
"bitflags 2.6.0",
]
[[package]]
@ -650,18 +1050,126 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "ring"
version = "0.17.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
dependencies = [
"cc",
"cfg-if",
"getrandom",
"libc",
"spin",
"untrusted",
"windows-sys",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustix"
version = "0.38.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6"
dependencies = [
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys",
]
[[package]]
name = "rustls"
version = "0.23.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f"
dependencies = [
"aws-lc-rs",
"log",
"once_cell",
"rustls-pki-types",
"rustls-webpki",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-pki-types"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b"
[[package]]
name = "rustls-webpki"
version = "0.102.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
dependencies = [
"aws-lc-rs",
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.215"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_bytes"
version = "0.11.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a"
dependencies = [
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.215"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.89",
]
[[package]]
name = "serde_ipld_dagcbor"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded35fbe4ab8fdec1f1d14b4daff2206b1eada4d6e708cb451d464d2d965f493"
dependencies = [
"cbor4ii",
"ipld-core",
"scopeguard",
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"
@ -682,6 +1190,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
@ -706,6 +1220,23 @@ dependencies = [
"autocfg",
]
[[package]]
name = "sled"
version = "0.34.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935"
dependencies = [
"crc32fast",
"crossbeam-epoch",
"crossbeam-utils",
"fs2",
"fxhash",
"libc",
"log",
"parking_lot 0.11.2",
"zstd",
]
[[package]]
name = "smallvec"
version = "1.13.2"
@ -722,6 +1253,29 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "subtle"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.89"
@ -756,7 +1310,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.89",
]
[[package]]
@ -779,7 +1333,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -795,7 +1349,18 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.89",
]
[[package]]
name = "tokio-rustls"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls",
"rustls-pki-types",
"tokio",
]
[[package]]
@ -836,7 +1401,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.89",
]
[[package]]
@ -896,6 +1461,18 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
[[package]]
name = "unsigned-varint"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "utf-8"
version = "0.7.6"
@ -938,6 +1515,27 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "webpki-roots"
version = "0.26.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix",
]
[[package]]
name = "winapi"
version = "0.3.9"
@ -1051,5 +1649,40 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.89",
]
[[package]]
name = "zeroize"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
[[package]]
name = "zstd"
version = "0.9.2+zstd.1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2390ea1bf6c038c39674f22d95f0564725fc06034a47129179810b2fc58caa54"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "4.1.3+zstd.1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e99d81b99fb3c2c2c794e3fe56c305c63d5173a16a46b5850b07c935ffc7db79"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "1.6.2+zstd.1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2daf2f248d9ea44454bfcb2516534e8b8ad2fc91bf818a1885495fc42bc8ac9f"
dependencies = [
"cc",
"libc",
]

View file

@ -9,10 +9,17 @@ fastwebsockets = { version = "0.8.0", features = ["hyper", "unstable-split", "up
http-body-util = "0.1.2"
hyper = { version = "1.5.1", features = ["client", "full", "http1", "http2", "server"] }
hyper-util = { version = "0.1.10", features = ["tokio", "server", "client", "http1", "http2"] }
ipld-core = "0.4.1"
pin-project-lite = "0.2.15"
qstring = "0.7.2"
rustls = "0.23.18"
serde = { version = "1.0.215", features = ["derive"] }
serde_ipld_dagcbor = "0.6.1"
sled = { version = "0.34.7", features = ["compression"] }
tap = "1.0.1"
tokio = { version = "1.41.1", features = ["full"] }
tokio-rustls = "0.26.0"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uuid = { version = "1.11.0", features = ["v4"] }
webpki-roots = "0.26.7"

View file

@ -6,3 +6,17 @@ The idea is that we can have much larger limits if we scale down the volume of t
- Large block sizes
- Large record size limit
- etcetcetc
## todo
- metrics / tracing / observability shit
- keep track of currently subscribed PDSes in application state
- discover PDSes via crawling instead of hardcoding 1 (lol)
- do not allow PDSes with more than 1000 repos
- robustness:
- try to reconnect (tamed exponential backoff) if we lose connection to a PDS
- allow catchup with cursor based on history data
- history:
- timestamp instead of seq number as key
- purge based on ttl
- takedowns probably

View file

@ -1,4 +1,4 @@
use crate::{prelude::*, relay_subscription::handle_subscription};
use crate::{prelude::*, relay_subscription::handle_subscription, RelayServer};
use std::{net::SocketAddr, sync::Arc};
@ -12,17 +12,6 @@ use hyper::{
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
pub struct RelayServer {
pub block_tx: tokio::sync::broadcast::Sender<Bytes>,
}
impl Default for RelayServer {
fn default() -> Self {
let (block_tx, _) = tokio::sync::broadcast::channel::<Bytes>(128);
Self { block_tx }
}
}
pub type ServerResponseBody = BoxBody<Bytes, hyper::Error>;
pub fn empty() -> ServerResponseBody {
Empty::<Bytes>::new().map_err(|e| match e {}).boxed()

171
src/indexer.rs Normal file
View file

@ -0,0 +1,171 @@
use std::{io::Cursor, sync::Arc};
use anyhow::{anyhow, Result};
use fastwebsockets::{FragmentCollector, OpCode, Payload};
use hyper::{body::Bytes, header, upgrade::Upgraded, Request};
use hyper_util::rt::{TokioExecutor, TokioIo};
use ipld_core::ipld::Ipld;
use serde_ipld_dagcbor::DecodeError;
use tokio::{
net::TcpStream,
sync::{
broadcast::{self},
mpsc,
},
};
use tokio_rustls::{rustls::pki_types::ServerName, TlsConnector};
use crate::{
http::empty,
wire_proto::{StreamingEvent, SubscriptionHeader},
RelayServer,
};
async fn create_ws_client(
domain: &str,
port: u16,
path: &str,
) -> Result<FragmentCollector<TokioIo<Upgraded>>> {
let addr = format!("{}:{}", domain, port);
let tcp_stream = TcpStream::connect(&addr).await?;
let root_store =
rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let domain_tls = ServerName::try_from(domain.to_string())?;
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let tls_connector = TlsConnector::from(Arc::new(client_config));
let tls_stream = tls_connector.connect(domain_tls, tcp_stream).await?;
let req = Request::builder()
.method("GET")
.uri(format!("wss://{}:{}{}", &domain, port, path))
.header("Host", &addr)
.header(header::UPGRADE, "websocket")
.header(header::CONNECTION, "upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.body(empty())?;
let (mut ws, _) = fastwebsockets::handshake::client(&TokioExecutor::new(), req, tls_stream)
.await
.unwrap();
ws.set_auto_pong(true);
ws.set_auto_close(true);
Ok(FragmentCollector::new(ws))
}
struct DataServerSubscription {
host: String,
raw_block_tx: broadcast::Sender<Bytes>,
event_tx: mpsc::Sender<StreamingEvent>,
last_seq: Option<i128>,
}
impl DataServerSubscription {
fn new(server: &RelayServer, host: String) -> Self {
Self {
host,
raw_block_tx: server.raw_block_tx.clone(),
event_tx: server.event_tx.clone(),
last_seq: None,
}
}
async fn handle_event(&mut self, frame: Bytes) -> Result<()> {
let buf: &[u8] = &frame;
let mut cursor = Cursor::new(buf);
let (header_buf, payload_buf) =
match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) {
Err(DecodeError::TrailingData) => buf.split_at(cursor.position() as usize),
_ => return Err(anyhow!("invalid frame type")),
};
let header = serde_ipld_dagcbor::from_slice::<SubscriptionHeader>(header_buf)?;
let payload = serde_ipld_dagcbor::from_slice::<Ipld>(payload_buf)?;
let Ipld::Map(payload) = payload else {
return Err(anyhow!(
"invalid payload type (expected Map, got {:?})",
payload.kind()
)); // message payloads must always be objects
};
match header.t.as_deref() {
Some("#commit") | Some("#handle") | Some("#identity") | Some("#account")
| Some("#migrate") | Some("#tombstone") | Some("#labels") => {
if let Some(Ipld::Integer(i)) = payload.get("seq") {
self.last_seq = Some(*i);
}
// send to sequencer, rewrites `seq` in payload
self.event_tx.send((header, payload)).await?;
}
// Some("#info") | unknown
_ => {
// no need to do sequence numbering :3 we can just emit the raw data
self.raw_block_tx.send(frame)?;
}
}
Ok(())
}
}
pub async fn subscribe_to_host(server: &RelayServer, host: String) -> Result<()> {
tracing::debug!(%host, "establishing connection");
let mut subscription = DataServerSubscription::new(server, host);
// TODO: reconnect (with backoff) (using cursor) if we lose connection
let mut ws = create_ws_client(
&subscription.host,
443,
"/xrpc/com.atproto.sync.subscribeRepos",
)
.await?;
tracing::debug!(host = %subscription.host, "listening");
while let Ok(frame) = ws.read_frame().await {
if frame.opcode == OpCode::Binary {
let bytes = match frame.payload {
Payload::BorrowedMut(slice) => Bytes::from(&*slice),
Payload::Borrowed(slice) => Bytes::from(slice),
Payload::Owned(vec) => Bytes::from(vec),
Payload::Bytes(bytes_mut) => Bytes::from(bytes_mut),
};
if let Err(e) = subscription.handle_event(bytes).await {
tracing::error!("error handling event (skipping): {e:?}");
continue;
}
}
}
tracing::debug!(host = %subscription.host, "disconnected");
Ok(())
}
pub fn index_servers(server: Arc<RelayServer>, hosts: &[String]) {
// in future we will spider out but right now i just want da stuff from my PDS
// TODO: we should be able to track and close / retry these connections lol
for host in hosts.iter() {
let host = host.to_string();
let server = Arc::clone(&server);
tokio::task::spawn(async move {
if let Err(e) = subscribe_to_host(&server, host).await {
tracing::warn!("encountered error subscribing to PDS: {e:?}");
}
});
}
}

View file

@ -1,4 +1,29 @@
use hyper::body::Bytes;
use tokio::sync::{broadcast, mpsc};
use wire_proto::StreamingEvent;
pub mod prelude;
pub struct RelayServer {
pub db: sled::Db,
pub event_tx: mpsc::Sender<StreamingEvent>,
pub raw_block_tx: broadcast::Sender<Bytes>,
}
impl RelayServer {
pub fn new(db: sled::Db, event_tx: mpsc::Sender<StreamingEvent>) -> Self {
let (raw_block_tx, _) = broadcast::channel(128);
Self {
db,
event_tx,
raw_block_tx,
}
}
}
pub mod http;
pub mod indexer;
pub mod relay_subscription;
pub mod server;
pub mod sequencer;
pub mod wire_proto;

View file

@ -1,21 +1,32 @@
use anyhow::Result;
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, str::FromStr, sync::Arc};
use tokio::sync::mpsc;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use cerulea_relay::server::{self, RelayServer};
use cerulea_relay::{
http::{self},
indexer::index_servers,
sequencer::start_sequencer,
RelayServer,
};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.with(EnvFilter::from_str("cerulea_relay=debug").unwrap())
.init();
let server = Arc::new(RelayServer::default());
let db = sled::open("data").expect("Failed to open database");
// TODO: scrape some dudes
let (event_tx, event_rx) = mpsc::channel(128);
let server = Arc::new(RelayServer::new(db, event_tx));
index_servers(Arc::clone(&server), &["pds.bun.how".into()]);
start_sequencer(Arc::clone(&server), event_rx);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
server::listen(Arc::clone(&server), addr).await?;
http::listen(server, addr).await?;
Ok(())
}

View file

@ -1,4 +1,4 @@
use crate::prelude::*;
use crate::{prelude::*, RelayServer};
use std::sync::Arc;
@ -23,7 +23,7 @@ use tokio::{
};
use uuid::Uuid;
use crate::server::{empty, full, RelayServer, ServerResponse};
use crate::http::{empty, full, ServerResponse};
enum Operation<'f> {
NoOp,
@ -48,7 +48,9 @@ impl RelaySubscription {
let (ws_rx, ws_tx) = ws.split(|stream| {
let upgraded = stream.into_inner();
let parts = upgraded.downcast::<TokioIo<TcpStream>>().expect("uhhhh");
let parts = upgraded
.downcast::<TokioIo<TcpStream>>()
.expect("HTTP stream should be a TokioIo<TcpStream> !");
let (read, write) = parts.io.into_inner().into_split();
(read, write)
});
@ -135,13 +137,13 @@ async fn run_subscription(
if let Some(_cursor) = cursor {
tracing::debug!(id = %sub.id, "filling from event cache");
// TODO: cursor catchup
// TODO: cursor catchup (read from server db history)
tracing::debug!(id = %sub.id, "subscription live-tailing");
}
// live tailing:
let mut block_rx = server.block_tx.subscribe();
let mut block_rx = server.raw_block_tx.subscribe();
while sub.running {
let op = tokio::select! {
biased;
@ -179,5 +181,5 @@ pub async fn handle_subscription(
});
let (head, _) = res.into_parts();
Ok(Response::from_parts(head, empty()))
Response::from_parts(head, empty()).pipe(Ok)
}

69
src/sequencer.rs Normal file
View file

@ -0,0 +1,69 @@
use std::{io::Cursor, sync::Arc};
use anyhow::Result;
use hyper::body::Bytes;
use ipld_core::ipld::Ipld;
use tokio::sync::mpsc;
use crate::{wire_proto::StreamingEvent, RelayServer};
async fn run_sequencer(
server: Arc<RelayServer>,
mut event_rx: mpsc::Receiver<StreamingEvent>,
) -> Result<()> {
let db = server.db.clone();
let mut curr_seq = db
.get(b"curr_seq")?
.map(|v| {
let mut buf = [0u8; 8];
let len = 8.min(v.len());
buf[..len].copy_from_slice(&v[..len]);
u64::from_le_bytes(buf)
})
.unwrap_or_default();
let events = db.open_tree(b"events")?;
tracing::debug!(seq = %curr_seq, "initial sequence number");
while let Some((header, mut payload)) = event_rx.recv().await {
let seq_bump = matches!(
header.t.as_deref(),
Some("#commit")
| Some("#handle")
| Some("#identity")
| Some("#account")
| Some("#migrate")
| Some("#tombstone")
| Some("#labels")
);
if seq_bump {
curr_seq += 1;
payload.insert("seq".into(), Ipld::Integer(curr_seq as i128));
db.insert(b"curr_seq", &u64::to_le_bytes(curr_seq))?;
}
let mut cursor = Cursor::new(Vec::with_capacity(1024 * 1024));
serde_ipld_dagcbor::to_writer(&mut cursor, &header)?;
serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?;
let data = Bytes::from(cursor.into_inner());
if seq_bump {
server.raw_block_tx.send(data.clone())?;
events.insert(u64::to_be_bytes(curr_seq), &*data)?;
} else {
server.raw_block_tx.send(data)?;
}
}
Ok(())
}
pub fn start_sequencer(server: Arc<RelayServer>, event_rx: mpsc::Receiver<StreamingEvent>) {
tokio::task::spawn(async move {
if let Err(e) = run_sequencer(server, event_rx).await {
tracing::error!("sequencer error: {e:?}");
}
});
}

13
src/wire_proto.rs Normal file
View file

@ -0,0 +1,13 @@
use std::collections::BTreeMap;
use ipld_core::ipld::Ipld;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct SubscriptionHeader {
pub op: i64,
#[serde(default)]
pub t: Option<String>,
}
pub type StreamingEvent = (SubscriptionHeader, BTreeMap<String, Ipld>);