diff --git a/get-operations.ts b/get-operations.ts index a0fb058..ce1290d 100644 --- a/get-operations.ts +++ b/get-operations.ts @@ -13,9 +13,11 @@ const addOperations = async ( .pipeThrough(new TextDecoderStream()) .pipeThrough(new TextLineStream()); + const didData = did.substring("did:plc:".length); + for await (const line of lines.values()) { - if (!line.startsWith(did)) continue; - const [_did, _createdAt, _cid, rawEntry] = line.split("\u001f", 4); + if (!line.startsWith(didData)) continue; + const [_did, _createdAt, rawEntry] = line.split("\u001f", 4); const entry = JSON.parse(rawEntry) as unknown as ExportEntry; operations.push(entry); } diff --git a/write-compacted.ts b/write-compacted.ts index 1de90b8..0606ecd 100644 --- a/write-compacted.ts +++ b/write-compacted.ts @@ -1,10 +1,39 @@ -import { ensureDir } from "jsr:@std/fs@1"; - -// TODO: automate compaction here -// 1. take note of latest-date -// 2. move data/plc/live/* to data/plc/compacting/* -// 3. zstd data/plc/compacting/* -// 4. move data/plc/compacting/*.zstd to data/plc/compacted//*.zstd +import { ensureDir, exists, expandGlob, move } from "jsr:@std/fs@1"; await ensureDir("./data/plc/compacted"); -await ensureDir("./data/plc/compacting"); + +export async function createCompactedIndex() { + if (await exists("./data/plc/compacting")) + throw new Error("compaction already running!"); + + const date = new Date(await Deno.readTextFile("./data/latest-date")); + + await move("./data/plc/live", "./data/plc/compacting"); + await ensureDir("./data/plc/live"); + + const inputFiles = await Array.fromAsync( + expandGlob("./data/plc/compacting/*") + ); + const process = new Deno.Command("zstd", { + args: inputFiles.map((it) => `./data/plc/compacting/${it.name}`), + cwd: Deno.cwd(), + stdout: "piped", + stdin: "null", + stderr: "piped", + }).spawn(); + await process.status; + // TODO: run zstd * + + const compactedDir = + "./data/plc/compacted/" + + date.toISOString().split("T")[0].replaceAll("-", ""); + await move("./data/plc/compacting", compactedDir); + + await Promise.all( + inputFiles.map((it) => Deno.remove(`${compactedDir}/${it.name}`)) + ); +} + +if (import.meta.main) { + await createCompactedIndex(); +} diff --git a/write-live.ts b/write-live.ts index 021b50d..c1a237e 100644 --- a/write-live.ts +++ b/write-live.ts @@ -9,11 +9,23 @@ export async function writeLiveRecord(entry: ExportEntry, raw: string) { const prefix = entry.did.substring(didplc, didplc + 2); const out = "./data/plc/live/" + prefix; - await Deno.writeTextFile( - out, - [entry.did, entry.createdAt, entry.cid, raw].join("\u001f") + "\n", - { - append: true, - } - ); + try { + await Deno.writeTextFile( + out, + [entry.did.substring(didplc), entry.createdAt, raw].join("\u001f") + "\n", + { + append: true, + } + ); + } catch { + // directory probably does not exist, just try again + await ensureDir("./data/plc/live"); + await Deno.writeTextFile( + out, + [entry.did.substring(didplc), entry.createdAt, raw].join("\u001f") + "\n", + { + append: true, + } + ); + } }