diff --git a/deno.json b/deno.json index 20448e2..fe9a340 100644 --- a/deno.json +++ b/deno.json @@ -1,7 +1,6 @@ { "tasks": { - "full-scan": "deno run -A --unstable-kv ./full-scan.ts", - "scrape": "deno run -A --unstable-kv ./scrape.ts" + "scrape": "deno run -A ./scrape.ts" }, "imports": { "@std/assert": "jsr:@std/assert@1" diff --git a/deno.lock b/deno.lock index a62576f..112c323 100644 --- a/deno.lock +++ b/deno.lock @@ -1,13 +1,24 @@ { "version": "4", "specifiers": { + "jsr:@std/fs@1": "1.0.6", "jsr:@std/json@1": "1.0.1", + "jsr:@std/path@^1.0.8": "1.0.8", "jsr:@std/streams@1": "1.0.8" }, "jsr": { + "@std/fs@1.0.6": { + "integrity": "42b56e1e41b75583a21d5a37f6a6a27de9f510bcd36c0c85791d685ca0b85fa2", + "dependencies": [ + "jsr:@std/path" + ] + }, "@std/json@1.0.1": { "integrity": "1f0f70737e8827f9acca086282e903677bc1bb0c8ffcd1f21bca60039563049f" }, + "@std/path@1.0.8": { + "integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be" + }, "@std/streams@1.0.8": { "integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3" } diff --git a/directory-tailer.ts b/directory-tailer.ts index 85b2856..e577bce 100644 --- a/directory-tailer.ts +++ b/directory-tailer.ts @@ -16,16 +16,13 @@ export class DirectoryTailer { public abort = new AbortController(); latestDate: string | undefined; + saveRaw: boolean = true; // set to false in production so you don't double-store plc data - constructor( - public kv: Deno.Kv, - startDate?: string - ) { + constructor(startDate?: string) { this.latestDate = startDate; } - async processRecord(entry: ExportEntry) { - await this.kv.set([entry.did, entry.cid], entry); + async processRecord(entry: ExportEntry, raw: string) { console.log( Deno.inspect( { @@ -36,6 +33,14 @@ export class DirectoryTailer { { breakLength: Infinity, compact: true, colors: true } ) ); + + const didplc = "did:plc:".length; + const prefix = entry.did.substring(didplc, didplc + 2); + const out = "./data/plc/live/" + prefix; + + await Deno.writeTextFile(out, raw + "\n", { + append: true, + }); } async fetchExports() { @@ -58,24 +63,34 @@ export class DirectoryTailer { } const text = await response.text(); - await Deno.writeTextFile("data/exports.jsonl", text + "\n", { - append: true, - }); + + if (this.saveRaw) { + await Deno.writeTextFile("data/exports.jsonl", text + "\n", { + append: true, + }); + } let entry: ExportEntry | undefined; + const promises = []; for (const line of new IterLines(text)) { entry = JSON.parse(line) as unknown as ExportEntry; - await this.processRecord(entry); + this.latestDate = entry.createdAt; + promises.push(this.processRecord(entry, line)); } + await Promise.all(promises); if (entry) { this.latestDate = entry.createdAt; + const write = Deno.writeTextFile("./data/latest-date", this.latestDate); + const timestamp = new Date(this.latestDate).getTime(); if (Date.now() - timestamp > 5_000) { await sleep(600); // 500 per 5 minutes } else { await sleep(2500); // sleep a little longer so that we can get more ops per request } + + await write; } else { await sleep(10_000); // we got nothing! sleep way longer } diff --git a/get-did-document.ts b/get-did-document.ts deleted file mode 100644 index 29af296..0000000 --- a/get-did-document.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { ExportEntry } from "./directory-tailer.ts"; -import { kv } from "./main.ts"; - -export const getDidDocument = async (did: string) => { - const listing = kv.list({ prefix: [did] }); - for await (const { value: operation } of listing) { - if (operation.nullified) continue; - console.log(operation); - } -}; diff --git a/get-operations.ts b/get-operations.ts new file mode 100644 index 0000000..1255011 --- /dev/null +++ b/get-operations.ts @@ -0,0 +1,59 @@ +import { TextLineStream } from "jsr:@std/streams@1/text-line-stream"; +import { ExportEntry } from "./directory-tailer.ts"; + +export const getOperations = async (did: string) => { + const operations = []; + + const didplc = "did:plc:".length; + const prefix = did.substring(didplc, didplc + 2); + + const compactedEntries = await Array.fromAsync( + Deno.readDir("./data/plc/compacted") + ); + compactedEntries.sort(); + for (const entry of compactedEntries) { + const process = new Deno.Command("zstd", { + args: [ + "-d", + `./data/plc/compacted/${entry.name}/${prefix}.zst`, + "--stdout", + ], + cwd: Deno.cwd(), + stdout: "piped", + stdin: "null", + stderr: "piped", + }).spawn(); + + const lines = process.stdout + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TextLineStream()); + + for await (const line of lines.values()) { + const entry = JSON.parse(line) as unknown as ExportEntry; + if (entry.did !== did) continue; + operations.push(entry); + } + + await process.status; + } + + const f = await Deno.open(`./data/plc/live/${prefix}`, { read: true }); + const lines = f.readable + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TextLineStream()); + for await (const line of lines.values()) { + const entry = JSON.parse(line) as unknown as ExportEntry; + if (entry.did !== did) continue; + operations.push(entry); + } + + return operations; +}; + +if (import.meta.main) { + await getOperations(Deno.args[0]); + + const then = performance.now(); + console.log(await getOperations(Deno.args[0])); + console.log(performance.now() - then); +} diff --git a/main.ts b/main.ts index fa46cb3..795a8ba 100644 --- a/main.ts +++ b/main.ts @@ -1,4 +1,7 @@ +import { ensureDir } from "jsr:@std/fs@1"; import { DirectoryTailer } from "./directory-tailer.ts"; -export const kv = await Deno.openKv("./data/directory-kv.db"); -export const tailer = new DirectoryTailer(kv); +await ensureDir("./data/plc/compacted"); +await ensureDir("./data/plc/live"); + +export const tailer = new DirectoryTailer(); diff --git a/full-scan.ts b/scan.ts similarity index 62% rename from full-scan.ts rename to scan.ts index a97855b..aad2b5e 100644 --- a/full-scan.ts +++ b/scan.ts @@ -1,16 +1,28 @@ import { TextLineStream } from "jsr:@std/streams@1/text-line-stream"; - import { ExportEntry } from "./directory-tailer.ts"; import { tailer } from "./main.ts"; export const fullScan = async () => { using exports = await Deno.open("./data/exports.jsonl", { read: true }); + + // interrupted at 2024-11-12T21:33:47.118Z + // byte offset + await exports.seek(13526812085, Deno.SeekMode.Start); + const lineStream = exports.readable .pipeThrough(new TextDecoderStream()) .pipeThrough(new TextLineStream()); + + { + const reader = lineStream.getReader(); + const line = await reader.read(); + console.log("dropping: " + line.value); + reader.releaseLock(); + } + for await (const line of lineStream.values()) { const entry = JSON.parse(line) as unknown as ExportEntry; - await tailer.processRecord(entry); + await tailer.processRecord(entry, line); } };