Compare commits

...

2 commits

Author SHA1 Message Date
e027bc234e use did/createdAt/cid prefix on written records
this means we can skip over dids we don't care about without parsing
JSON *AND* we get good lexicographic sorting of log files (which will be
useful for more complex compaction later)
2025-01-03 06:45:38 +00:00
0d2412813f filter duplicate cids 2024-11-30 20:17:49 +00:00
6 changed files with 80 additions and 40 deletions

View file

@ -1,4 +1,5 @@
import { IterLines } from "./util.ts"; import { IterLines } from "./util.ts";
import { writeLiveRecord } from "./write-live.ts";
type PlcOperation = unknown; type PlcOperation = unknown;
@ -15,6 +16,8 @@ const sleep = (timeout: number) => new Promise((r) => setTimeout(r, timeout));
export class DirectoryTailer { export class DirectoryTailer {
public abort = new AbortController(); public abort = new AbortController();
lastBatchCIDs = new Set<string>();
latestDate: string | undefined; latestDate: string | undefined;
saveRaw: boolean = true; // set to false in production so you don't double-store plc data saveRaw: boolean = true; // set to false in production so you don't double-store plc data
@ -34,13 +37,7 @@ export class DirectoryTailer {
) )
); );
const didplc = "did:plc:".length; await writeLiveRecord(entry, raw);
const prefix = entry.did.substring(didplc, didplc + 2);
const out = "./data/plc/live/" + prefix;
await Deno.writeTextFile(out, raw + "\n", {
append: true,
});
} }
async fetchExports() { async fetchExports() {
@ -72,13 +69,19 @@ export class DirectoryTailer {
let entry: ExportEntry | undefined; let entry: ExportEntry | undefined;
const promises = []; const promises = [];
const cids = new Set<string>();
for (const line of new IterLines(text)) { for (const line of new IterLines(text)) {
entry = JSON.parse(line) as unknown as ExportEntry; entry = JSON.parse(line) as unknown as ExportEntry;
if (this.lastBatchCIDs.has(entry.cid)) continue;
this.latestDate = entry.createdAt; this.latestDate = entry.createdAt;
cids.add(entry.cid);
promises.push(this.processRecord(entry, line)); promises.push(this.processRecord(entry, line));
} }
await Promise.all(promises); await Promise.all(promises);
this.lastBatchCIDs = cids;
if (entry) { if (entry) {
this.latestDate = entry.createdAt; this.latestDate = entry.createdAt;
const write = Deno.writeTextFile("./data/latest-date", this.latestDate); const write = Deno.writeTextFile("./data/latest-date", this.latestDate);

View file

@ -1,8 +1,28 @@
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream"; import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
import { ExportEntry } from "./directory-tailer.ts"; import { ExportEntry } from "./directory-tailer.ts";
import "./write-compacted.ts";
import "./write-live.ts";
const addOperations = async (
stream: ReadableStream<Uint8Array>,
did: string,
operations: ExportEntry[]
) => {
const lines = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
for await (const line of lines.values()) {
if (!line.startsWith(did)) continue;
const [_did, _createdAt, _cid, rawEntry] = line.split("\u001f", 4);
const entry = JSON.parse(rawEntry) as unknown as ExportEntry;
operations.push(entry);
}
};
export const getOperations = async (did: string) => { export const getOperations = async (did: string) => {
const operations = []; const operations: ExportEntry[] = [];
const didplc = "did:plc:".length; const didplc = "did:plc:".length;
const prefix = did.substring(didplc, didplc + 2); const prefix = did.substring(didplc, didplc + 2);
@ -14,6 +34,11 @@ export const getOperations = async (did: string) => {
a.name < b.name ? -1 : a.name > b.name ? 1 : 0 a.name < b.name ? -1 : a.name > b.name ? 1 : 0
); );
for (const entry of compactedEntries) { for (const entry of compactedEntries) {
// TODO: if we assume that compacted files are *sorted*, we get ordering by did and createdAt,
// which gives us a complete op log for each did. we can store a little size prefix for a block
// and seek over dids we don't care about, giving us whole contiguous op logs in a compacted file.
// but for now we just un-zstd it and skip individual lines we don't care about the same as a live file
const process = new Deno.Command("zstd", { const process = new Deno.Command("zstd", {
args: [ args: [
"-d", "-d",
@ -26,31 +51,17 @@ export const getOperations = async (did: string) => {
stderr: "piped", stderr: "piped",
}).spawn(); }).spawn();
const lines = process.stdout await addOperations(process.stdout, did, operations);
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
for await (const line of lines.values()) {
const entry = JSON.parse(line) as unknown as ExportEntry;
if (entry.did !== did) continue;
operations.push(entry);
}
await process.status; await process.status;
} }
try { for (const dir of ["compacting", "live"]) {
const f = await Deno.open(`./data/plc/live/${prefix}`, { read: true }); try {
const lines = f.readable const f = await Deno.open(`./data/plc/${dir}/${prefix}`, { read: true });
.pipeThrough(new TextDecoderStream()) await addOperations(f.readable, did, operations);
.pipeThrough(new TextLineStream()); } catch (_err) {
for await (const line of lines.values()) { // ignore
const entry = JSON.parse(line) as unknown as ExportEntry;
if (entry.did !== did) continue;
operations.push(entry);
} }
} catch (_err) {
// ignore
} }
return operations; return operations;

View file

@ -1,7 +1,3 @@
import { ensureDir } from "jsr:@std/fs@1";
import { DirectoryTailer } from "./directory-tailer.ts"; import { DirectoryTailer } from "./directory-tailer.ts";
await ensureDir("./data/plc/compacted");
await ensureDir("./data/plc/live");
export const tailer = new DirectoryTailer(); export const tailer = new DirectoryTailer();

View file

@ -17,14 +17,15 @@ export const catchUp = async () => {
lineReader.releaseLock(); lineReader.releaseLock();
} }
let lastLine: string | undefined; tailer.lastBatchCIDs.clear();
for await (const line of lineStream.values()) { for await (const line of lineStream.values()) {
lastLine = line; try {
} const entry = JSON.parse(line) as unknown as ExportEntry;
tailer.latestDate = entry.createdAt;
if (lastLine) { tailer.lastBatchCIDs.add(entry.cid);
const entry = JSON.parse(lastLine) as unknown as ExportEntry; } catch (_err) {
tailer.latestDate = entry.createdAt; // ignore
}
} }
}; };

10
write-compacted.ts Normal file
View file

@ -0,0 +1,10 @@
import { ensureDir } from "jsr:@std/fs@1";
// TODO: automate compaction here
// 1. take note of latest-date
// 2. move data/plc/live/* to data/plc/compacting/*
// 3. zstd data/plc/compacting/*
// 4. move data/plc/compacting/*.zstd to data/plc/compacted/<date>/*.zstd
await ensureDir("./data/plc/compacted");
await ensureDir("./data/plc/compacting");

19
write-live.ts Normal file
View file

@ -0,0 +1,19 @@
import { ensureDir } from "jsr:@std/fs@1";
import { ExportEntry } from "./directory-tailer.ts";
await ensureDir("./data/plc/live");
export async function writeLiveRecord(entry: ExportEntry, raw: string) {
const didplc = "did:plc:".length;
const prefix = entry.did.substring(didplc, didplc + 2);
const out = "./data/plc/live/" + prefix;
await Deno.writeTextFile(
out,
[entry.did, entry.createdAt, entry.cid, raw].join("\u001f") + "\n",
{
append: true,
}
);
}