move to flat files idk

This commit is contained in:
Charlotte Som 2024-11-28 05:08:47 +02:00
parent 47678e1e95
commit c539a4dc5a
7 changed files with 115 additions and 26 deletions

View file

@ -1,7 +1,6 @@
{ {
"tasks": { "tasks": {
"full-scan": "deno run -A --unstable-kv ./full-scan.ts", "scrape": "deno run -A ./scrape.ts"
"scrape": "deno run -A --unstable-kv ./scrape.ts"
}, },
"imports": { "imports": {
"@std/assert": "jsr:@std/assert@1" "@std/assert": "jsr:@std/assert@1"

View file

@ -1,13 +1,24 @@
{ {
"version": "4", "version": "4",
"specifiers": { "specifiers": {
"jsr:@std/fs@1": "1.0.6",
"jsr:@std/json@1": "1.0.1", "jsr:@std/json@1": "1.0.1",
"jsr:@std/path@^1.0.8": "1.0.8",
"jsr:@std/streams@1": "1.0.8" "jsr:@std/streams@1": "1.0.8"
}, },
"jsr": { "jsr": {
"@std/fs@1.0.6": {
"integrity": "42b56e1e41b75583a21d5a37f6a6a27de9f510bcd36c0c85791d685ca0b85fa2",
"dependencies": [
"jsr:@std/path"
]
},
"@std/json@1.0.1": { "@std/json@1.0.1": {
"integrity": "1f0f70737e8827f9acca086282e903677bc1bb0c8ffcd1f21bca60039563049f" "integrity": "1f0f70737e8827f9acca086282e903677bc1bb0c8ffcd1f21bca60039563049f"
}, },
"@std/path@1.0.8": {
"integrity": "548fa456bb6a04d3c1a1e7477986b6cffbce95102d0bb447c67c4ee70e0364be"
},
"@std/streams@1.0.8": { "@std/streams@1.0.8": {
"integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3" "integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3"
} }

View file

@ -16,16 +16,13 @@ export class DirectoryTailer {
public abort = new AbortController(); public abort = new AbortController();
latestDate: string | undefined; latestDate: string | undefined;
saveRaw: boolean = true; // set to false in production so you don't double-store plc data
constructor( constructor(startDate?: string) {
public kv: Deno.Kv,
startDate?: string
) {
this.latestDate = startDate; this.latestDate = startDate;
} }
async processRecord(entry: ExportEntry) { async processRecord(entry: ExportEntry, raw: string) {
await this.kv.set([entry.did, entry.cid], entry);
console.log( console.log(
Deno.inspect( Deno.inspect(
{ {
@ -36,6 +33,14 @@ export class DirectoryTailer {
{ breakLength: Infinity, compact: true, colors: true } { breakLength: Infinity, compact: true, colors: true }
) )
); );
const didplc = "did:plc:".length;
const prefix = entry.did.substring(didplc, didplc + 2);
const out = "./data/plc/live/" + prefix;
await Deno.writeTextFile(out, raw + "\n", {
append: true,
});
} }
async fetchExports() { async fetchExports() {
@ -58,24 +63,34 @@ export class DirectoryTailer {
} }
const text = await response.text(); const text = await response.text();
if (this.saveRaw) {
await Deno.writeTextFile("data/exports.jsonl", text + "\n", { await Deno.writeTextFile("data/exports.jsonl", text + "\n", {
append: true, append: true,
}); });
}
let entry: ExportEntry | undefined; let entry: ExportEntry | undefined;
const promises = [];
for (const line of new IterLines(text)) { for (const line of new IterLines(text)) {
entry = JSON.parse(line) as unknown as ExportEntry; entry = JSON.parse(line) as unknown as ExportEntry;
await this.processRecord(entry); this.latestDate = entry.createdAt;
promises.push(this.processRecord(entry, line));
} }
await Promise.all(promises);
if (entry) { if (entry) {
this.latestDate = entry.createdAt; this.latestDate = entry.createdAt;
const write = Deno.writeTextFile("./data/latest-date", this.latestDate);
const timestamp = new Date(this.latestDate).getTime(); const timestamp = new Date(this.latestDate).getTime();
if (Date.now() - timestamp > 5_000) { if (Date.now() - timestamp > 5_000) {
await sleep(600); // 500 per 5 minutes await sleep(600); // 500 per 5 minutes
} else { } else {
await sleep(2500); // sleep a little longer so that we can get more ops per request await sleep(2500); // sleep a little longer so that we can get more ops per request
} }
await write;
} else { } else {
await sleep(10_000); // we got nothing! sleep way longer await sleep(10_000); // we got nothing! sleep way longer
} }

View file

@ -1,10 +0,0 @@
import { ExportEntry } from "./directory-tailer.ts";
import { kv } from "./main.ts";
export const getDidDocument = async (did: string) => {
const listing = kv.list<ExportEntry>({ prefix: [did] });
for await (const { value: operation } of listing) {
if (operation.nullified) continue;
console.log(operation);
}
};

59
get-operations.ts Normal file
View file

@ -0,0 +1,59 @@
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
import { ExportEntry } from "./directory-tailer.ts";
export const getOperations = async (did: string) => {
const operations = [];
const didplc = "did:plc:".length;
const prefix = did.substring(didplc, didplc + 2);
const compactedEntries = await Array.fromAsync(
Deno.readDir("./data/plc/compacted")
);
compactedEntries.sort();
for (const entry of compactedEntries) {
const process = new Deno.Command("zstd", {
args: [
"-d",
`./data/plc/compacted/${entry.name}/${prefix}.zst`,
"--stdout",
],
cwd: Deno.cwd(),
stdout: "piped",
stdin: "null",
stderr: "piped",
}).spawn();
const lines = process.stdout
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
for await (const line of lines.values()) {
const entry = JSON.parse(line) as unknown as ExportEntry;
if (entry.did !== did) continue;
operations.push(entry);
}
await process.status;
}
const f = await Deno.open(`./data/plc/live/${prefix}`, { read: true });
const lines = f.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
for await (const line of lines.values()) {
const entry = JSON.parse(line) as unknown as ExportEntry;
if (entry.did !== did) continue;
operations.push(entry);
}
return operations;
};
if (import.meta.main) {
await getOperations(Deno.args[0]);
const then = performance.now();
console.log(await getOperations(Deno.args[0]));
console.log(performance.now() - then);
}

View file

@ -1,4 +1,7 @@
import { ensureDir } from "jsr:@std/fs@1";
import { DirectoryTailer } from "./directory-tailer.ts"; import { DirectoryTailer } from "./directory-tailer.ts";
export const kv = await Deno.openKv("./data/directory-kv.db"); await ensureDir("./data/plc/compacted");
export const tailer = new DirectoryTailer(kv); await ensureDir("./data/plc/live");
export const tailer = new DirectoryTailer();

View file

@ -1,16 +1,28 @@
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream"; import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
import { ExportEntry } from "./directory-tailer.ts"; import { ExportEntry } from "./directory-tailer.ts";
import { tailer } from "./main.ts"; import { tailer } from "./main.ts";
export const fullScan = async () => { export const fullScan = async () => {
using exports = await Deno.open("./data/exports.jsonl", { read: true }); using exports = await Deno.open("./data/exports.jsonl", { read: true });
// interrupted at 2024-11-12T21:33:47.118Z
// byte offset
await exports.seek(13526812085, Deno.SeekMode.Start);
const lineStream = exports.readable const lineStream = exports.readable
.pipeThrough(new TextDecoderStream()) .pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream()); .pipeThrough(new TextLineStream());
{
const reader = lineStream.getReader();
const line = await reader.read();
console.log("dropping: " + line.value);
reader.releaseLock();
}
for await (const line of lineStream.values()) { for await (const line of lineStream.values()) {
const entry = JSON.parse(line) as unknown as ExportEntry; const entry = JSON.parse(line) as unknown as ExportEntry;
await tailer.processRecord(entry); await tailer.processRecord(entry, line);
} }
}; };