implement simple compaction

This commit is contained in:
Charlotte Som 2025-01-04 05:30:10 +00:00
parent e027bc234e
commit c11ab303d2
3 changed files with 60 additions and 17 deletions

View file

@ -13,9 +13,11 @@ const addOperations = async (
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
const didData = did.substring("did:plc:".length);
for await (const line of lines.values()) {
if (!line.startsWith(did)) continue;
const [_did, _createdAt, _cid, rawEntry] = line.split("\u001f", 4);
if (!line.startsWith(didData)) continue;
const [_did, _createdAt, rawEntry] = line.split("\u001f", 4);
const entry = JSON.parse(rawEntry) as unknown as ExportEntry;
operations.push(entry);
}

View file

@ -1,10 +1,39 @@
import { ensureDir } from "jsr:@std/fs@1";
// TODO: automate compaction here
// 1. take note of latest-date
// 2. move data/plc/live/* to data/plc/compacting/*
// 3. zstd data/plc/compacting/*
// 4. move data/plc/compacting/*.zstd to data/plc/compacted/<date>/*.zstd
import { ensureDir, exists, expandGlob, move } from "jsr:@std/fs@1";
await ensureDir("./data/plc/compacted");
await ensureDir("./data/plc/compacting");
export async function createCompactedIndex() {
if (await exists("./data/plc/compacting"))
throw new Error("compaction already running!");
const date = new Date(await Deno.readTextFile("./data/latest-date"));
await move("./data/plc/live", "./data/plc/compacting");
await ensureDir("./data/plc/live");
const inputFiles = await Array.fromAsync(
expandGlob("./data/plc/compacting/*")
);
const process = new Deno.Command("zstd", {
args: inputFiles.map((it) => `./data/plc/compacting/${it.name}`),
cwd: Deno.cwd(),
stdout: "piped",
stdin: "null",
stderr: "piped",
}).spawn();
await process.status;
// TODO: run zstd *
const compactedDir =
"./data/plc/compacted/" +
date.toISOString().split("T")[0].replaceAll("-", "");
await move("./data/plc/compacting", compactedDir);
await Promise.all(
inputFiles.map((it) => Deno.remove(`${compactedDir}/${it.name}`))
);
}
if (import.meta.main) {
await createCompactedIndex();
}

View file

@ -9,11 +9,23 @@ export async function writeLiveRecord(entry: ExportEntry, raw: string) {
const prefix = entry.did.substring(didplc, didplc + 2);
const out = "./data/plc/live/" + prefix;
await Deno.writeTextFile(
out,
[entry.did, entry.createdAt, entry.cid, raw].join("\u001f") + "\n",
{
append: true,
}
);
try {
await Deno.writeTextFile(
out,
[entry.did.substring(didplc), entry.createdAt, raw].join("\u001f") + "\n",
{
append: true,
}
);
} catch {
// directory probably does not exist, just try again
await ensureDir("./data/plc/live");
await Deno.writeTextFile(
out,
[entry.did.substring(didplc), entry.createdAt, raw].join("\u001f") + "\n",
{
append: true,
}
);
}
}