78 lines
2.3 KiB
TypeScript
78 lines
2.3 KiB
TypeScript
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
|
|
import { ExportEntry } from "./directory-tailer.ts";
|
|
|
|
import "./write-compacted.ts";
|
|
import "./write-live.ts";
|
|
|
|
const addOperations = async (
|
|
stream: ReadableStream<Uint8Array>,
|
|
did: string,
|
|
operations: ExportEntry[]
|
|
) => {
|
|
const lines = stream
|
|
.pipeThrough(new TextDecoderStream())
|
|
.pipeThrough(new TextLineStream());
|
|
|
|
const didData = did.substring("did:plc:".length);
|
|
|
|
for await (const line of lines.values()) {
|
|
if (!line.startsWith(didData)) continue;
|
|
const [_did, _createdAt, rawEntry] = line.split("\u001f", 4);
|
|
const entry = JSON.parse(rawEntry) as unknown as ExportEntry;
|
|
operations.push(entry);
|
|
}
|
|
};
|
|
|
|
export const getOperations = async (did: string) => {
|
|
const operations: ExportEntry[] = [];
|
|
|
|
const didplc = "did:plc:".length;
|
|
const prefix = did.substring(didplc, didplc + 2);
|
|
|
|
const compactedEntries = await Array.fromAsync(
|
|
Deno.readDir("./data/plc/compacted")
|
|
);
|
|
compactedEntries.sort((a, b) =>
|
|
a.name < b.name ? -1 : a.name > b.name ? 1 : 0
|
|
);
|
|
for (const entry of compactedEntries) {
|
|
// TODO: if we assume that compacted files are *sorted*, we get ordering by did and createdAt,
|
|
// which gives us a complete op log for each did. we can store a little size prefix for a block
|
|
// and seek over dids we don't care about, giving us whole contiguous op logs in a compacted file.
|
|
// but for now we just un-zstd it and skip individual lines we don't care about the same as a live file
|
|
|
|
const process = new Deno.Command("zstd", {
|
|
args: [
|
|
"-d",
|
|
`./data/plc/compacted/${entry.name}/${prefix}.zst`,
|
|
"--stdout",
|
|
],
|
|
cwd: Deno.cwd(),
|
|
stdout: "piped",
|
|
stdin: "null",
|
|
stderr: "piped",
|
|
}).spawn();
|
|
|
|
await addOperations(process.stdout, did, operations);
|
|
await process.status;
|
|
}
|
|
|
|
for (const dir of ["compacting", "live"]) {
|
|
try {
|
|
const f = await Deno.open(`./data/plc/${dir}/${prefix}`, { read: true });
|
|
await addOperations(f.readable, did, operations);
|
|
} catch (_err) {
|
|
// ignore
|
|
}
|
|
}
|
|
|
|
return operations;
|
|
};
|
|
|
|
if (import.meta.main) {
|
|
await getOperations(Deno.args[0]);
|
|
|
|
const then = performance.now();
|
|
console.log(await getOperations(Deno.args[0]));
|
|
console.log(performance.now() - then);
|
|
}
|