plc-replica/get-operations.ts

78 lines
2.3 KiB
TypeScript

import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
import { ExportEntry } from "./directory-tailer.ts";
import "./write-compacted.ts";
import "./write-live.ts";
const addOperations = async (
stream: ReadableStream<Uint8Array>,
did: string,
operations: ExportEntry[]
) => {
const lines = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
const didData = did.substring("did:plc:".length);
for await (const line of lines.values()) {
if (!line.startsWith(didData)) continue;
const [_did, _createdAt, rawEntry] = line.split("\u001f", 4);
const entry = JSON.parse(rawEntry) as unknown as ExportEntry;
operations.push(entry);
}
};
export const getOperations = async (did: string) => {
const operations: ExportEntry[] = [];
const didplc = "did:plc:".length;
const prefix = did.substring(didplc, didplc + 2);
const compactedEntries = await Array.fromAsync(
Deno.readDir("./data/plc/compacted")
);
compactedEntries.sort((a, b) =>
a.name < b.name ? -1 : a.name > b.name ? 1 : 0
);
for (const entry of compactedEntries) {
// TODO: if we assume that compacted files are *sorted*, we get ordering by did and createdAt,
// which gives us a complete op log for each did. we can store a little size prefix for a block
// and seek over dids we don't care about, giving us whole contiguous op logs in a compacted file.
// but for now we just un-zstd it and skip individual lines we don't care about the same as a live file
const process = new Deno.Command("zstd", {
args: [
"-d",
`./data/plc/compacted/${entry.name}/${prefix}.zst`,
"--stdout",
],
cwd: Deno.cwd(),
stdout: "piped",
stdin: "null",
stderr: "piped",
}).spawn();
await addOperations(process.stdout, did, operations);
await process.status;
}
for (const dir of ["compacting", "live"]) {
try {
const f = await Deno.open(`./data/plc/${dir}/${prefix}`, { read: true });
await addOperations(f.readable, did, operations);
} catch (_err) {
// ignore
}
}
return operations;
};
if (import.meta.main) {
await getOperations(Deno.args[0]);
const then = performance.now();
console.log(await getOperations(Deno.args[0]));
console.log(performance.now() - then);
}