diff --git a/directory-tailer.ts b/directory-tailer.ts index 24b6081..2e61959 100644 --- a/directory-tailer.ts +++ b/directory-tailer.ts @@ -1,4 +1,5 @@ import { IterLines } from "./util.ts"; +import { writeLiveRecord } from "./write-live.ts"; type PlcOperation = unknown; @@ -36,13 +37,7 @@ export class DirectoryTailer { ) ); - const didplc = "did:plc:".length; - const prefix = entry.did.substring(didplc, didplc + 2); - const out = "./data/plc/live/" + prefix; - - await Deno.writeTextFile(out, raw + "\n", { - append: true, - }); + await writeLiveRecord(entry, raw); } async fetchExports() { diff --git a/get-operations.ts b/get-operations.ts index ac0c295..a0fb058 100644 --- a/get-operations.ts +++ b/get-operations.ts @@ -1,8 +1,28 @@ import { TextLineStream } from "jsr:@std/streams@1/text-line-stream"; import { ExportEntry } from "./directory-tailer.ts"; +import "./write-compacted.ts"; +import "./write-live.ts"; + +const addOperations = async ( + stream: ReadableStream, + did: string, + operations: ExportEntry[] +) => { + const lines = stream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TextLineStream()); + + for await (const line of lines.values()) { + if (!line.startsWith(did)) continue; + const [_did, _createdAt, _cid, rawEntry] = line.split("\u001f", 4); + const entry = JSON.parse(rawEntry) as unknown as ExportEntry; + operations.push(entry); + } +}; + export const getOperations = async (did: string) => { - const operations = []; + const operations: ExportEntry[] = []; const didplc = "did:plc:".length; const prefix = did.substring(didplc, didplc + 2); @@ -14,6 +34,11 @@ export const getOperations = async (did: string) => { a.name < b.name ? -1 : a.name > b.name ? 1 : 0 ); for (const entry of compactedEntries) { + // TODO: if we assume that compacted files are *sorted*, we get ordering by did and createdAt, + // which gives us a complete op log for each did. we can store a little size prefix for a block + // and seek over dids we don't care about, giving us whole contiguous op logs in a compacted file. + // but for now we just un-zstd it and skip individual lines we don't care about the same as a live file + const process = new Deno.Command("zstd", { args: [ "-d", @@ -26,31 +51,17 @@ export const getOperations = async (did: string) => { stderr: "piped", }).spawn(); - const lines = process.stdout - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new TextLineStream()); - - for await (const line of lines.values()) { - const entry = JSON.parse(line) as unknown as ExportEntry; - if (entry.did !== did) continue; - operations.push(entry); - } - + await addOperations(process.stdout, did, operations); await process.status; } - try { - const f = await Deno.open(`./data/plc/live/${prefix}`, { read: true }); - const lines = f.readable - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new TextLineStream()); - for await (const line of lines.values()) { - const entry = JSON.parse(line) as unknown as ExportEntry; - if (entry.did !== did) continue; - operations.push(entry); + for (const dir of ["compacting", "live"]) { + try { + const f = await Deno.open(`./data/plc/${dir}/${prefix}`, { read: true }); + await addOperations(f.readable, did, operations); + } catch (_err) { + // ignore } - } catch (_err) { - // ignore } return operations; diff --git a/main.ts b/main.ts index 795a8ba..2e7bcc2 100644 --- a/main.ts +++ b/main.ts @@ -1,7 +1,3 @@ -import { ensureDir } from "jsr:@std/fs@1"; import { DirectoryTailer } from "./directory-tailer.ts"; -await ensureDir("./data/plc/compacted"); -await ensureDir("./data/plc/live"); - export const tailer = new DirectoryTailer(); diff --git a/write-compacted.ts b/write-compacted.ts new file mode 100644 index 0000000..1de90b8 --- /dev/null +++ b/write-compacted.ts @@ -0,0 +1,10 @@ +import { ensureDir } from "jsr:@std/fs@1"; + +// TODO: automate compaction here +// 1. take note of latest-date +// 2. move data/plc/live/* to data/plc/compacting/* +// 3. zstd data/plc/compacting/* +// 4. move data/plc/compacting/*.zstd to data/plc/compacted//*.zstd + +await ensureDir("./data/plc/compacted"); +await ensureDir("./data/plc/compacting"); diff --git a/write-live.ts b/write-live.ts new file mode 100644 index 0000000..021b50d --- /dev/null +++ b/write-live.ts @@ -0,0 +1,19 @@ +import { ensureDir } from "jsr:@std/fs@1"; + +import { ExportEntry } from "./directory-tailer.ts"; + +await ensureDir("./data/plc/live"); + +export async function writeLiveRecord(entry: ExportEntry, raw: string) { + const didplc = "did:plc:".length; + const prefix = entry.did.substring(didplc, didplc + 2); + const out = "./data/plc/live/" + prefix; + + await Deno.writeTextFile( + out, + [entry.did, entry.createdAt, entry.cid, raw].join("\u001f") + "\n", + { + append: true, + } + ); +}