From 0a9b998469033294a46f8affe1bf2afee814608f Mon Sep 17 00:00:00 2001 From: Charlotte Som Date: Mon, 25 Nov 2024 06:00:50 +0200 Subject: [PATCH] add indexer and sequencer, rebroadcast events with real seq numbers relaying proof of concept done --- .gitignore | 1 + Cargo.lock | 651 ++++++++++++++++++++++++++++++++++++- Cargo.toml | 7 + README.md | 14 + src/{server.rs => http.rs} | 13 +- src/indexer.rs | 171 ++++++++++ src/lib.rs | 27 +- src/main.rs | 23 +- src/relay_subscription.rs | 14 +- src/sequencer.rs | 69 ++++ src/wire_proto.rs | 13 + 11 files changed, 969 insertions(+), 34 deletions(-) rename src/{server.rs => http.rs} (86%) create mode 100644 src/indexer.rs create mode 100644 src/sequencer.rs create mode 100644 src/wire_proto.rs diff --git a/.gitignore b/.gitignore index ea8c4bf..a727c0a 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +/data diff --git a/Cargo.lock b/Cargo.lock index df3f2b2..8276aef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,32 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "aws-lc-rs" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f47bb8cc16b669d267eeccf585aea077d0882f4777b1c1f740217885d6e6e5a3" +dependencies = [ + "aws-lc-sys", + "paste", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2101df3813227bbaaaa0b04cd61c534c7954b22bd68d399b440be937dc63ff7" +dependencies = [ + "bindgen", + "cc", + "cmake", + "dunce", + "fs_extra", + "libc", + "paste", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -59,12 +85,47 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "base-x" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" + [[package]] name = "base64" version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags 2.6.0", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.89", + "which", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.6.0" @@ -92,6 +153,26 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +[[package]] +name = "cbor4ii" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4" +dependencies = [ + "serde", +] + +[[package]] +name = "cc" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" +dependencies = [ + "jobserver", + "libc", + "shlex", +] + [[package]] name = "cerulea_relay" version = "0.1.0" @@ -101,13 +182,29 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", + "ipld-core", "pin-project-lite", "qstring", + "rustls", + "serde", + "serde_ipld_dagcbor", + "sled", "tap", "tokio", + "tokio-rustls", "tracing", "tracing-subscriber", "uuid", + "webpki-roots", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", ] [[package]] @@ -116,6 +213,49 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cid" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a" +dependencies = [ + "core2", + "multibase", + "multihash", + "serde", + "serde_bytes", + "unsigned-varint", +] + +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + +[[package]] +name = "cmake" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a" +dependencies = [ + "cc", +] + +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.16" @@ -125,6 +265,30 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -135,6 +299,32 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + +[[package]] +name = "data-encoding-macro" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1559b6cba622276d6d63706db152618eeb15b89b3e4041446b05876e352e639" +dependencies = [ + "data-encoding", + "data-encoding-macro-internal", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332d754c0af53bc87c108fed664d121ecf59207ec4196041f04d6ab9002ad33f" +dependencies = [ + "data-encoding", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -145,12 +335,34 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + +[[package]] +name = "either" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" + [[package]] name = "equivalent" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "fastwebsockets" version = "0.8.0" @@ -177,6 +389,22 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures-channel" version = "0.3.31" @@ -216,6 +444,15 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -243,6 +480,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.4.7" @@ -274,6 +517,15 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys", +] + [[package]] name = "http" version = "1.1.0" @@ -369,24 +621,84 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "ipld-core" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ede82a79e134f179f4b29b5fdb1eb92bd1b38c4dfea394c539051150a21b9b" +dependencies = [ + "cid", + "serde", + "serde_bytes", +] + +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +[[package]] +name = "libloading" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" +dependencies = [ + "cfg-if", + "windows-targets", +] + +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "lock_api" version = "0.4.12" @@ -418,6 +730,12 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -439,6 +757,38 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multibase" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404" +dependencies = [ + "base-x", + "data-encoding", + "data-encoding-macro", +] + +[[package]] +name = "multihash" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc41f430805af9d1cf4adae4ed2149c759b877b01d909a1f40256188d09345d2" +dependencies = [ + "core2", + "serde", + "unsigned-varint", +] + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -470,6 +820,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -477,7 +838,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.10", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -488,11 +863,17 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.7", "smallvec", "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -516,7 +897,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -540,6 +921,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" +dependencies = [ + "proc-macro2", + "syn 2.0.89", +] + [[package]] name = "proc-macro2" version = "1.0.92" @@ -597,13 +988,22 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ - "bitflags", + "bitflags 2.6.0", ] [[package]] @@ -650,18 +1050,126 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys", +] + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + +[[package]] +name = "rustix" +version = "0.38.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" +dependencies = [ + "bitflags 2.6.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + +[[package]] +name = "rustls" +version = "0.23.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" +dependencies = [ + "aws-lc-rs", + "log", + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" + +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "aws-lc-rs", + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "serde" +version = "1.0.215" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_bytes" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_derive" +version = "1.0.215" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.89", +] + +[[package]] +name = "serde_ipld_dagcbor" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ded35fbe4ab8fdec1f1d14b4daff2206b1eada4d6e708cb451d464d2d965f493" +dependencies = [ + "cbor4ii", + "ipld-core", + "scopeguard", + "serde", +] + [[package]] name = "sha1" version = "0.10.6" @@ -682,6 +1190,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -706,6 +1220,23 @@ dependencies = [ "autocfg", ] +[[package]] +name = "sled" +version = "0.34.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log", + "parking_lot 0.11.2", + "zstd", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -722,6 +1253,29 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.89" @@ -756,7 +1310,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -779,7 +1333,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -795,7 +1349,18 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", ] [[package]] @@ -836,7 +1401,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", ] [[package]] @@ -896,6 +1461,18 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "unsigned-varint" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" + +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "utf-8" version = "0.7.6" @@ -938,6 +1515,27 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "webpki-roots" +version = "0.26.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1051,5 +1649,40 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.89", +] + +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zstd" +version = "0.9.2+zstd.1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2390ea1bf6c038c39674f22d95f0564725fc06034a47129179810b2fc58caa54" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "4.1.3+zstd.1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e99d81b99fb3c2c2c794e3fe56c305c63d5173a16a46b5850b07c935ffc7db79" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.6.2+zstd.1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2daf2f248d9ea44454bfcb2516534e8b8ad2fc91bf818a1885495fc42bc8ac9f" +dependencies = [ + "cc", + "libc", ] diff --git a/Cargo.toml b/Cargo.toml index 0028d1b..daeab84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,17 @@ fastwebsockets = { version = "0.8.0", features = ["hyper", "unstable-split", "up http-body-util = "0.1.2" hyper = { version = "1.5.1", features = ["client", "full", "http1", "http2", "server"] } hyper-util = { version = "0.1.10", features = ["tokio", "server", "client", "http1", "http2"] } +ipld-core = "0.4.1" pin-project-lite = "0.2.15" qstring = "0.7.2" +rustls = "0.23.18" +serde = { version = "1.0.215", features = ["derive"] } +serde_ipld_dagcbor = "0.6.1" +sled = { version = "0.34.7", features = ["compression"] } tap = "1.0.1" tokio = { version = "1.41.1", features = ["full"] } +tokio-rustls = "0.26.0" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } uuid = { version = "1.11.0", features = ["v4"] } +webpki-roots = "0.26.7" diff --git a/README.md b/README.md index b0baea7..42523db 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,17 @@ The idea is that we can have much larger limits if we scale down the volume of t - Large block sizes - Large record size limit - etcetcetc + +## todo + +- metrics / tracing / observability shit +- keep track of currently subscribed PDSes in application state +- discover PDSes via crawling instead of hardcoding 1 (lol) + - do not allow PDSes with more than 1000 repos +- robustness: + - try to reconnect (tamed exponential backoff) if we lose connection to a PDS + - allow catchup with cursor based on history data +- history: + - timestamp instead of seq number as key + - purge based on ttl + - takedowns probably diff --git a/src/server.rs b/src/http.rs similarity index 86% rename from src/server.rs rename to src/http.rs index bf49bd2..84ffa83 100644 --- a/src/server.rs +++ b/src/http.rs @@ -1,4 +1,4 @@ -use crate::{prelude::*, relay_subscription::handle_subscription}; +use crate::{prelude::*, relay_subscription::handle_subscription, RelayServer}; use std::{net::SocketAddr, sync::Arc}; @@ -12,17 +12,6 @@ use hyper::{ use hyper_util::rt::TokioIo; use tokio::net::TcpListener; -pub struct RelayServer { - pub block_tx: tokio::sync::broadcast::Sender, -} - -impl Default for RelayServer { - fn default() -> Self { - let (block_tx, _) = tokio::sync::broadcast::channel::(128); - Self { block_tx } - } -} - pub type ServerResponseBody = BoxBody; pub fn empty() -> ServerResponseBody { Empty::::new().map_err(|e| match e {}).boxed() diff --git a/src/indexer.rs b/src/indexer.rs new file mode 100644 index 0000000..1ce5bbc --- /dev/null +++ b/src/indexer.rs @@ -0,0 +1,171 @@ +use std::{io::Cursor, sync::Arc}; + +use anyhow::{anyhow, Result}; +use fastwebsockets::{FragmentCollector, OpCode, Payload}; +use hyper::{body::Bytes, header, upgrade::Upgraded, Request}; +use hyper_util::rt::{TokioExecutor, TokioIo}; +use ipld_core::ipld::Ipld; +use serde_ipld_dagcbor::DecodeError; +use tokio::{ + net::TcpStream, + sync::{ + broadcast::{self}, + mpsc, + }, +}; +use tokio_rustls::{rustls::pki_types::ServerName, TlsConnector}; + +use crate::{ + http::empty, + wire_proto::{StreamingEvent, SubscriptionHeader}, + RelayServer, +}; + +async fn create_ws_client( + domain: &str, + port: u16, + path: &str, +) -> Result>> { + let addr = format!("{}:{}", domain, port); + let tcp_stream = TcpStream::connect(&addr).await?; + + let root_store = + rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + let domain_tls = ServerName::try_from(domain.to_string())?; + let client_config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + let tls_connector = TlsConnector::from(Arc::new(client_config)); + let tls_stream = tls_connector.connect(domain_tls, tcp_stream).await?; + + let req = Request::builder() + .method("GET") + .uri(format!("wss://{}:{}{}", &domain, port, path)) + .header("Host", &addr) + .header(header::UPGRADE, "websocket") + .header(header::CONNECTION, "upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .body(empty())?; + + let (mut ws, _) = fastwebsockets::handshake::client(&TokioExecutor::new(), req, tls_stream) + .await + .unwrap(); + ws.set_auto_pong(true); + ws.set_auto_close(true); + + Ok(FragmentCollector::new(ws)) +} + +struct DataServerSubscription { + host: String, + raw_block_tx: broadcast::Sender, + event_tx: mpsc::Sender, + last_seq: Option, +} + +impl DataServerSubscription { + fn new(server: &RelayServer, host: String) -> Self { + Self { + host, + raw_block_tx: server.raw_block_tx.clone(), + event_tx: server.event_tx.clone(), + last_seq: None, + } + } + + async fn handle_event(&mut self, frame: Bytes) -> Result<()> { + let buf: &[u8] = &frame; + let mut cursor = Cursor::new(buf); + let (header_buf, payload_buf) = + match serde_ipld_dagcbor::from_reader::(&mut cursor) { + Err(DecodeError::TrailingData) => buf.split_at(cursor.position() as usize), + _ => return Err(anyhow!("invalid frame type")), + }; + let header = serde_ipld_dagcbor::from_slice::(header_buf)?; + let payload = serde_ipld_dagcbor::from_slice::(payload_buf)?; + + let Ipld::Map(payload) = payload else { + return Err(anyhow!( + "invalid payload type (expected Map, got {:?})", + payload.kind() + )); // message payloads must always be objects + }; + + match header.t.as_deref() { + Some("#commit") | Some("#handle") | Some("#identity") | Some("#account") + | Some("#migrate") | Some("#tombstone") | Some("#labels") => { + if let Some(Ipld::Integer(i)) = payload.get("seq") { + self.last_seq = Some(*i); + } + + // send to sequencer, rewrites `seq` in payload + self.event_tx.send((header, payload)).await?; + } + + // Some("#info") | unknown + _ => { + // no need to do sequence numbering :3 we can just emit the raw data + self.raw_block_tx.send(frame)?; + } + } + + Ok(()) + } +} + +pub async fn subscribe_to_host(server: &RelayServer, host: String) -> Result<()> { + tracing::debug!(%host, "establishing connection"); + + let mut subscription = DataServerSubscription::new(server, host); + + // TODO: reconnect (with backoff) (using cursor) if we lose connection + + let mut ws = create_ws_client( + &subscription.host, + 443, + "/xrpc/com.atproto.sync.subscribeRepos", + ) + .await?; + + tracing::debug!(host = %subscription.host, "listening"); + + while let Ok(frame) = ws.read_frame().await { + if frame.opcode == OpCode::Binary { + let bytes = match frame.payload { + Payload::BorrowedMut(slice) => Bytes::from(&*slice), + Payload::Borrowed(slice) => Bytes::from(slice), + Payload::Owned(vec) => Bytes::from(vec), + Payload::Bytes(bytes_mut) => Bytes::from(bytes_mut), + }; + + if let Err(e) = subscription.handle_event(bytes).await { + tracing::error!("error handling event (skipping): {e:?}"); + continue; + } + } + } + + tracing::debug!(host = %subscription.host, "disconnected"); + + Ok(()) +} + +pub fn index_servers(server: Arc, hosts: &[String]) { + // in future we will spider out but right now i just want da stuff from my PDS + + // TODO: we should be able to track and close / retry these connections lol + + for host in hosts.iter() { + let host = host.to_string(); + let server = Arc::clone(&server); + tokio::task::spawn(async move { + if let Err(e) = subscribe_to_host(&server, host).await { + tracing::warn!("encountered error subscribing to PDS: {e:?}"); + } + }); + } +} diff --git a/src/lib.rs b/src/lib.rs index 163d56f..86b1a1a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,29 @@ +use hyper::body::Bytes; +use tokio::sync::{broadcast, mpsc}; +use wire_proto::StreamingEvent; + pub mod prelude; +pub struct RelayServer { + pub db: sled::Db, + + pub event_tx: mpsc::Sender, + pub raw_block_tx: broadcast::Sender, +} + +impl RelayServer { + pub fn new(db: sled::Db, event_tx: mpsc::Sender) -> Self { + let (raw_block_tx, _) = broadcast::channel(128); + Self { + db, + event_tx, + raw_block_tx, + } + } +} + +pub mod http; +pub mod indexer; pub mod relay_subscription; -pub mod server; +pub mod sequencer; +pub mod wire_proto; diff --git a/src/main.rs b/src/main.rs index 14a480a..3bac25f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,32 @@ use anyhow::Result; -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, str::FromStr, sync::Arc}; +use tokio::sync::mpsc; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use cerulea_relay::server::{self, RelayServer}; +use cerulea_relay::{ + http::{self}, + indexer::index_servers, + sequencer::start_sequencer, + RelayServer, +}; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::registry() .with(fmt::layer()) - .with(EnvFilter::from_default_env()) + .with(EnvFilter::from_str("cerulea_relay=debug").unwrap()) .init(); - let server = Arc::new(RelayServer::default()); + let db = sled::open("data").expect("Failed to open database"); - // TODO: scrape some dudes + let (event_tx, event_rx) = mpsc::channel(128); + + let server = Arc::new(RelayServer::new(db, event_tx)); + + index_servers(Arc::clone(&server), &["pds.bun.how".into()]); + start_sequencer(Arc::clone(&server), event_rx); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - server::listen(Arc::clone(&server), addr).await?; + http::listen(server, addr).await?; Ok(()) } diff --git a/src/relay_subscription.rs b/src/relay_subscription.rs index 35b79ee..e2898df 100644 --- a/src/relay_subscription.rs +++ b/src/relay_subscription.rs @@ -1,4 +1,4 @@ -use crate::prelude::*; +use crate::{prelude::*, RelayServer}; use std::sync::Arc; @@ -23,7 +23,7 @@ use tokio::{ }; use uuid::Uuid; -use crate::server::{empty, full, RelayServer, ServerResponse}; +use crate::http::{empty, full, ServerResponse}; enum Operation<'f> { NoOp, @@ -48,7 +48,9 @@ impl RelaySubscription { let (ws_rx, ws_tx) = ws.split(|stream| { let upgraded = stream.into_inner(); - let parts = upgraded.downcast::>().expect("uhhhh"); + let parts = upgraded + .downcast::>() + .expect("HTTP stream should be a TokioIo !"); let (read, write) = parts.io.into_inner().into_split(); (read, write) }); @@ -135,13 +137,13 @@ async fn run_subscription( if let Some(_cursor) = cursor { tracing::debug!(id = %sub.id, "filling from event cache"); - // TODO: cursor catchup + // TODO: cursor catchup (read from server db history) tracing::debug!(id = %sub.id, "subscription live-tailing"); } // live tailing: - let mut block_rx = server.block_tx.subscribe(); + let mut block_rx = server.raw_block_tx.subscribe(); while sub.running { let op = tokio::select! { biased; @@ -179,5 +181,5 @@ pub async fn handle_subscription( }); let (head, _) = res.into_parts(); - Ok(Response::from_parts(head, empty())) + Response::from_parts(head, empty()).pipe(Ok) } diff --git a/src/sequencer.rs b/src/sequencer.rs new file mode 100644 index 0000000..39195ba --- /dev/null +++ b/src/sequencer.rs @@ -0,0 +1,69 @@ +use std::{io::Cursor, sync::Arc}; + +use anyhow::Result; +use hyper::body::Bytes; +use ipld_core::ipld::Ipld; +use tokio::sync::mpsc; + +use crate::{wire_proto::StreamingEvent, RelayServer}; + +async fn run_sequencer( + server: Arc, + mut event_rx: mpsc::Receiver, +) -> Result<()> { + let db = server.db.clone(); + let mut curr_seq = db + .get(b"curr_seq")? + .map(|v| { + let mut buf = [0u8; 8]; + let len = 8.min(v.len()); + buf[..len].copy_from_slice(&v[..len]); + u64::from_le_bytes(buf) + }) + .unwrap_or_default(); + + let events = db.open_tree(b"events")?; + + tracing::debug!(seq = %curr_seq, "initial sequence number"); + + while let Some((header, mut payload)) = event_rx.recv().await { + let seq_bump = matches!( + header.t.as_deref(), + Some("#commit") + | Some("#handle") + | Some("#identity") + | Some("#account") + | Some("#migrate") + | Some("#tombstone") + | Some("#labels") + ); + + if seq_bump { + curr_seq += 1; + payload.insert("seq".into(), Ipld::Integer(curr_seq as i128)); + db.insert(b"curr_seq", &u64::to_le_bytes(curr_seq))?; + } + + let mut cursor = Cursor::new(Vec::with_capacity(1024 * 1024)); + serde_ipld_dagcbor::to_writer(&mut cursor, &header)?; + serde_ipld_dagcbor::to_writer(&mut cursor, &payload)?; + + let data = Bytes::from(cursor.into_inner()); + if seq_bump { + server.raw_block_tx.send(data.clone())?; + events.insert(u64::to_be_bytes(curr_seq), &*data)?; + } else { + server.raw_block_tx.send(data)?; + } + } + + Ok(()) +} + +pub fn start_sequencer(server: Arc, event_rx: mpsc::Receiver) { + tokio::task::spawn(async move { + if let Err(e) = run_sequencer(server, event_rx).await { + tracing::error!("sequencer error: {e:?}"); + } + }); +} diff --git a/src/wire_proto.rs b/src/wire_proto.rs new file mode 100644 index 0000000..42382a6 --- /dev/null +++ b/src/wire_proto.rs @@ -0,0 +1,13 @@ +use std::collections::BTreeMap; + +use ipld_core::ipld::Ipld; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct SubscriptionHeader { + pub op: i64, + #[serde(default)] + pub t: Option, +} + +pub type StreamingEvent = (SubscriptionHeader, BTreeMap);