65 lines
1.7 KiB
TypeScript
65 lines
1.7 KiB
TypeScript
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
|
|
import { ExportEntry } from "./directory-tailer.ts";
|
|
|
|
export const getOperations = async (did: string) => {
|
|
const operations = [];
|
|
|
|
const didplc = "did:plc:".length;
|
|
const prefix = did.substring(didplc, didplc + 2);
|
|
|
|
const compactedEntries = await Array.fromAsync(
|
|
Deno.readDir("./data/plc/compacted")
|
|
);
|
|
compactedEntries.sort((a, b) =>
|
|
a.name < b.name ? -1 : a.name > b.name ? 1 : 0
|
|
);
|
|
for (const entry of compactedEntries) {
|
|
const process = new Deno.Command("zstd", {
|
|
args: [
|
|
"-d",
|
|
`./data/plc/compacted/${entry.name}/${prefix}.zst`,
|
|
"--stdout",
|
|
],
|
|
cwd: Deno.cwd(),
|
|
stdout: "piped",
|
|
stdin: "null",
|
|
stderr: "piped",
|
|
}).spawn();
|
|
|
|
const lines = process.stdout
|
|
.pipeThrough(new TextDecoderStream())
|
|
.pipeThrough(new TextLineStream());
|
|
|
|
for await (const line of lines.values()) {
|
|
const entry = JSON.parse(line) as unknown as ExportEntry;
|
|
if (entry.did !== did) continue;
|
|
operations.push(entry);
|
|
}
|
|
|
|
await process.status;
|
|
}
|
|
|
|
try {
|
|
const f = await Deno.open(`./data/plc/live/${prefix}`, { read: true });
|
|
const lines = f.readable
|
|
.pipeThrough(new TextDecoderStream())
|
|
.pipeThrough(new TextLineStream());
|
|
for await (const line of lines.values()) {
|
|
const entry = JSON.parse(line) as unknown as ExportEntry;
|
|
if (entry.did !== did) continue;
|
|
operations.push(entry);
|
|
}
|
|
} catch (_err) {
|
|
// ignore
|
|
}
|
|
|
|
return operations;
|
|
};
|
|
|
|
if (import.meta.main) {
|
|
await getOperations(Deno.args[0]);
|
|
|
|
const then = performance.now();
|
|
console.log(await getOperations(Deno.args[0]));
|
|
console.log(performance.now() - then);
|
|
}
|