use did/createdAt/cid prefix on written records
this means we can skip over dids we don't care about without parsing JSON *AND* we get good lexicographic sorting of log files (which will be useful for more complex compaction later)
This commit is contained in:
parent
0d2412813f
commit
e027bc234e
5 changed files with 64 additions and 33 deletions
|
|
@ -1,4 +1,5 @@
|
|||
import { IterLines } from "./util.ts";
|
||||
import { writeLiveRecord } from "./write-live.ts";
|
||||
|
||||
type PlcOperation = unknown;
|
||||
|
||||
|
|
@ -36,13 +37,7 @@ export class DirectoryTailer {
|
|||
)
|
||||
);
|
||||
|
||||
const didplc = "did:plc:".length;
|
||||
const prefix = entry.did.substring(didplc, didplc + 2);
|
||||
const out = "./data/plc/live/" + prefix;
|
||||
|
||||
await Deno.writeTextFile(out, raw + "\n", {
|
||||
append: true,
|
||||
});
|
||||
await writeLiveRecord(entry, raw);
|
||||
}
|
||||
|
||||
async fetchExports() {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,28 @@
|
|||
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
|
||||
import { ExportEntry } from "./directory-tailer.ts";
|
||||
|
||||
import "./write-compacted.ts";
|
||||
import "./write-live.ts";
|
||||
|
||||
const addOperations = async (
|
||||
stream: ReadableStream<Uint8Array>,
|
||||
did: string,
|
||||
operations: ExportEntry[]
|
||||
) => {
|
||||
const lines = stream
|
||||
.pipeThrough(new TextDecoderStream())
|
||||
.pipeThrough(new TextLineStream());
|
||||
|
||||
for await (const line of lines.values()) {
|
||||
if (!line.startsWith(did)) continue;
|
||||
const [_did, _createdAt, _cid, rawEntry] = line.split("\u001f", 4);
|
||||
const entry = JSON.parse(rawEntry) as unknown as ExportEntry;
|
||||
operations.push(entry);
|
||||
}
|
||||
};
|
||||
|
||||
export const getOperations = async (did: string) => {
|
||||
const operations = [];
|
||||
const operations: ExportEntry[] = [];
|
||||
|
||||
const didplc = "did:plc:".length;
|
||||
const prefix = did.substring(didplc, didplc + 2);
|
||||
|
|
@ -14,6 +34,11 @@ export const getOperations = async (did: string) => {
|
|||
a.name < b.name ? -1 : a.name > b.name ? 1 : 0
|
||||
);
|
||||
for (const entry of compactedEntries) {
|
||||
// TODO: if we assume that compacted files are *sorted*, we get ordering by did and createdAt,
|
||||
// which gives us a complete op log for each did. we can store a little size prefix for a block
|
||||
// and seek over dids we don't care about, giving us whole contiguous op logs in a compacted file.
|
||||
// but for now we just un-zstd it and skip individual lines we don't care about the same as a live file
|
||||
|
||||
const process = new Deno.Command("zstd", {
|
||||
args: [
|
||||
"-d",
|
||||
|
|
@ -26,31 +51,17 @@ export const getOperations = async (did: string) => {
|
|||
stderr: "piped",
|
||||
}).spawn();
|
||||
|
||||
const lines = process.stdout
|
||||
.pipeThrough(new TextDecoderStream())
|
||||
.pipeThrough(new TextLineStream());
|
||||
|
||||
for await (const line of lines.values()) {
|
||||
const entry = JSON.parse(line) as unknown as ExportEntry;
|
||||
if (entry.did !== did) continue;
|
||||
operations.push(entry);
|
||||
}
|
||||
|
||||
await addOperations(process.stdout, did, operations);
|
||||
await process.status;
|
||||
}
|
||||
|
||||
try {
|
||||
const f = await Deno.open(`./data/plc/live/${prefix}`, { read: true });
|
||||
const lines = f.readable
|
||||
.pipeThrough(new TextDecoderStream())
|
||||
.pipeThrough(new TextLineStream());
|
||||
for await (const line of lines.values()) {
|
||||
const entry = JSON.parse(line) as unknown as ExportEntry;
|
||||
if (entry.did !== did) continue;
|
||||
operations.push(entry);
|
||||
for (const dir of ["compacting", "live"]) {
|
||||
try {
|
||||
const f = await Deno.open(`./data/plc/${dir}/${prefix}`, { read: true });
|
||||
await addOperations(f.readable, did, operations);
|
||||
} catch (_err) {
|
||||
// ignore
|
||||
}
|
||||
} catch (_err) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
return operations;
|
||||
|
|
|
|||
4
main.ts
4
main.ts
|
|
@ -1,7 +1,3 @@
|
|||
import { ensureDir } from "jsr:@std/fs@1";
|
||||
import { DirectoryTailer } from "./directory-tailer.ts";
|
||||
|
||||
await ensureDir("./data/plc/compacted");
|
||||
await ensureDir("./data/plc/live");
|
||||
|
||||
export const tailer = new DirectoryTailer();
|
||||
|
|
|
|||
10
write-compacted.ts
Normal file
10
write-compacted.ts
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
import { ensureDir } from "jsr:@std/fs@1";
|
||||
|
||||
// TODO: automate compaction here
|
||||
// 1. take note of latest-date
|
||||
// 2. move data/plc/live/* to data/plc/compacting/*
|
||||
// 3. zstd data/plc/compacting/*
|
||||
// 4. move data/plc/compacting/*.zstd to data/plc/compacted/<date>/*.zstd
|
||||
|
||||
await ensureDir("./data/plc/compacted");
|
||||
await ensureDir("./data/plc/compacting");
|
||||
19
write-live.ts
Normal file
19
write-live.ts
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
import { ensureDir } from "jsr:@std/fs@1";
|
||||
|
||||
import { ExportEntry } from "./directory-tailer.ts";
|
||||
|
||||
await ensureDir("./data/plc/live");
|
||||
|
||||
export async function writeLiveRecord(entry: ExportEntry, raw: string) {
|
||||
const didplc = "did:plc:".length;
|
||||
const prefix = entry.did.substring(didplc, didplc + 2);
|
||||
const out = "./data/plc/live/" + prefix;
|
||||
|
||||
await Deno.writeTextFile(
|
||||
out,
|
||||
[entry.did, entry.createdAt, entry.cid, raw].join("\u001f") + "\n",
|
||||
{
|
||||
append: true,
|
||||
}
|
||||
);
|
||||
}
|
||||
Loading…
Reference in a new issue