plc-replica/directory-tailer.ts

103 lines
2.7 KiB
TypeScript
Raw Permalink Normal View History

2024-11-27 16:30:50 +00:00
import { IterLines } from "./util.ts";
import { writeLiveRecord } from "./write-live.ts";
2024-11-27 16:30:50 +00:00
type PlcOperation = unknown;
export interface ExportEntry {
did: string;
operation: PlcOperation;
cid: string;
nullified: boolean;
createdAt: string; // iso timestamp
}
const sleep = (timeout: number) => new Promise((r) => setTimeout(r, timeout));
export class DirectoryTailer {
public abort = new AbortController();
2024-11-30 20:17:49 +00:00
lastBatchCIDs = new Set<string>();
2024-11-27 16:30:50 +00:00
latestDate: string | undefined;
2024-11-28 03:08:47 +00:00
saveRaw: boolean = true; // set to false in production so you don't double-store plc data
2024-11-27 16:30:50 +00:00
2024-11-28 03:08:47 +00:00
constructor(startDate?: string) {
2024-11-27 16:30:50 +00:00
this.latestDate = startDate;
}
2024-11-28 03:08:47 +00:00
async processRecord(entry: ExportEntry, raw: string) {
2024-11-27 16:30:50 +00:00
console.log(
Deno.inspect(
{
createdAt: entry.createdAt,
did: entry.did,
cid: entry.cid,
},
{ breakLength: Infinity, compact: true, colors: true }
)
);
2024-11-28 03:08:47 +00:00
await writeLiveRecord(entry, raw);
2024-11-27 16:30:50 +00:00
}
async fetchExports() {
const url = new URL("https://plc.directory/export");
url.searchParams.set("count", "1000");
while (!this.abort.signal.aborted) {
if (this.latestDate !== undefined) {
url.searchParams.set("after", this.latestDate);
}
console.log("%c[+]%c %s", "color: green", "color: unset", url.toString());
const response = await fetch(url, {
headers: { "User-Agent": "cerulea-plc-replica/1.0 (cerulea.blue)" },
});
if (response.status !== 200) {
console.error(response);
break;
}
const text = await response.text();
2024-11-28 03:08:47 +00:00
if (this.saveRaw) {
await Deno.writeTextFile("data/exports.jsonl", text + "\n", {
append: true,
});
}
2024-11-27 16:30:50 +00:00
let entry: ExportEntry | undefined;
2024-11-28 03:08:47 +00:00
const promises = [];
2024-11-30 20:17:49 +00:00
const cids = new Set<string>();
2024-11-27 16:30:50 +00:00
for (const line of new IterLines(text)) {
entry = JSON.parse(line) as unknown as ExportEntry;
2024-11-30 20:17:49 +00:00
if (this.lastBatchCIDs.has(entry.cid)) continue;
2024-11-28 03:08:47 +00:00
this.latestDate = entry.createdAt;
2024-11-30 20:17:49 +00:00
cids.add(entry.cid);
2024-11-28 03:08:47 +00:00
promises.push(this.processRecord(entry, line));
2024-11-27 16:30:50 +00:00
}
2024-11-28 03:08:47 +00:00
await Promise.all(promises);
2024-11-27 16:30:50 +00:00
2024-11-30 20:17:49 +00:00
this.lastBatchCIDs = cids;
2024-11-27 16:30:50 +00:00
if (entry) {
this.latestDate = entry.createdAt;
2024-11-28 03:08:47 +00:00
const write = Deno.writeTextFile("./data/latest-date", this.latestDate);
2024-11-27 16:30:50 +00:00
const timestamp = new Date(this.latestDate).getTime();
if (Date.now() - timestamp > 5_000) {
await sleep(600); // 500 per 5 minutes
} else {
await sleep(2500); // sleep a little longer so that we can get more ops per request
}
2024-11-28 03:08:47 +00:00
await write;
2024-11-27 16:30:50 +00:00
} else {
await sleep(10_000); // we got nothing! sleep way longer
}
}
}
}