(wip) initial commit

This commit is contained in:
Charlotte Som 2024-11-27 18:30:50 +02:00
commit 47678e1e95
10 changed files with 223 additions and 0 deletions

6
README.md Normal file
View file

@ -0,0 +1,6 @@
# plc-replica
`plc.directory` live-tailing replica for Cerulea.
This lets us go as hard as we want on the PLC API from AppView/Relay land and not worry about running into any rate limits.
In addition, since Cerulea infrastructure is mostly based in central Europe, it's a sizeable latency cut as well, at the cost of maybe 2-5s replication delay.

2
data/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*
!.gitignore

9
deno.json Normal file
View file

@ -0,0 +1,9 @@
{
"tasks": {
"full-scan": "deno run -A --unstable-kv ./full-scan.ts",
"scrape": "deno run -A --unstable-kv ./scrape.ts"
},
"imports": {
"@std/assert": "jsr:@std/assert@1"
}
}

20
deno.lock Normal file
View file

@ -0,0 +1,20 @@
{
"version": "4",
"specifiers": {
"jsr:@std/json@1": "1.0.1",
"jsr:@std/streams@1": "1.0.8"
},
"jsr": {
"@std/json@1.0.1": {
"integrity": "1f0f70737e8827f9acca086282e903677bc1bb0c8ffcd1f21bca60039563049f"
},
"@std/streams@1.0.8": {
"integrity": "b41332d93d2cf6a82fe4ac2153b930adf1a859392931e2a19d9fabfb6f154fb3"
}
},
"workspace": {
"dependencies": [
"jsr:@std/assert@1"
]
}
}

84
directory-tailer.ts Normal file
View file

@ -0,0 +1,84 @@
import { IterLines } from "./util.ts";
type PlcOperation = unknown;
export interface ExportEntry {
did: string;
operation: PlcOperation;
cid: string;
nullified: boolean;
createdAt: string; // iso timestamp
}
const sleep = (timeout: number) => new Promise((r) => setTimeout(r, timeout));
export class DirectoryTailer {
public abort = new AbortController();
latestDate: string | undefined;
constructor(
public kv: Deno.Kv,
startDate?: string
) {
this.latestDate = startDate;
}
async processRecord(entry: ExportEntry) {
await this.kv.set([entry.did, entry.cid], entry);
console.log(
Deno.inspect(
{
createdAt: entry.createdAt,
did: entry.did,
cid: entry.cid,
},
{ breakLength: Infinity, compact: true, colors: true }
)
);
}
async fetchExports() {
const url = new URL("https://plc.directory/export");
url.searchParams.set("count", "1000");
while (!this.abort.signal.aborted) {
if (this.latestDate !== undefined) {
url.searchParams.set("after", this.latestDate);
}
console.log("%c[+]%c %s", "color: green", "color: unset", url.toString());
const response = await fetch(url, {
headers: { "User-Agent": "cerulea-plc-replica/1.0 (cerulea.blue)" },
});
if (response.status !== 200) {
console.error(response);
break;
}
const text = await response.text();
await Deno.writeTextFile("data/exports.jsonl", text + "\n", {
append: true,
});
let entry: ExportEntry | undefined;
for (const line of new IterLines(text)) {
entry = JSON.parse(line) as unknown as ExportEntry;
await this.processRecord(entry);
}
if (entry) {
this.latestDate = entry.createdAt;
const timestamp = new Date(this.latestDate).getTime();
if (Date.now() - timestamp > 5_000) {
await sleep(600); // 500 per 5 minutes
} else {
await sleep(2500); // sleep a little longer so that we can get more ops per request
}
} else {
await sleep(10_000); // we got nothing! sleep way longer
}
}
}
}

17
full-scan.ts Normal file
View file

@ -0,0 +1,17 @@
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
import { ExportEntry } from "./directory-tailer.ts";
import { tailer } from "./main.ts";
export const fullScan = async () => {
using exports = await Deno.open("./data/exports.jsonl", { read: true });
const lineStream = exports.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
for await (const line of lineStream.values()) {
const entry = JSON.parse(line) as unknown as ExportEntry;
await tailer.processRecord(entry);
}
};
await fullScan();

10
get-did-document.ts Normal file
View file

@ -0,0 +1,10 @@
import { ExportEntry } from "./directory-tailer.ts";
import { kv } from "./main.ts";
export const getDidDocument = async (did: string) => {
const listing = kv.list<ExportEntry>({ prefix: [did] });
for await (const { value: operation } of listing) {
if (operation.nullified) continue;
console.log(operation);
}
};

4
main.ts Normal file
View file

@ -0,0 +1,4 @@
import { DirectoryTailer } from "./directory-tailer.ts";
export const kv = await Deno.openKv("./data/directory-kv.db");
export const tailer = new DirectoryTailer(kv);

37
scrape.ts Normal file
View file

@ -0,0 +1,37 @@
import { TextLineStream } from "jsr:@std/streams@1/text-line-stream";
import { ExportEntry } from "./directory-tailer.ts";
import { tailer } from "./main.ts";
export const catchUp = async () => {
using exports = await Deno.open("./data/exports.jsonl", { read: true });
await exports.seek(-4096, Deno.SeekMode.End); // grab last 4kib
const lineStream = exports.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream());
// discard one partial line
{
const lineReader = lineStream.getReader();
const _line = await lineReader.read();
lineReader.releaseLock();
}
let lastLine: string | undefined;
for await (const line of lineStream.values()) {
lastLine = line;
}
if (lastLine) {
const entry = JSON.parse(lastLine) as unknown as ExportEntry;
tailer.latestDate = entry.createdAt;
}
};
try {
await catchUp();
} catch (err) {
console.warn(err);
}
await tailer.fetchExports();

34
util.ts Normal file
View file

@ -0,0 +1,34 @@
export class IterLines {
public position: number = 0;
constructor(public text: string) {}
next() {
if (this.position >= this.text.length) {
return undefined; // no more lines
}
const start = this.position;
while (
this.position < this.text.length &&
this.text[this.position] !== "\n"
) {
this.position++;
}
const line = this.text.slice(start, this.position);
this.position++; // skip the newline character
return line;
}
[Symbol.iterator](): Iterator<string> {
let line: string | undefined = this.next()!;
return {
next: () => {
const nextLine = this.next();
const r = { value: line as string, done: nextLine === undefined };
line = nextLine;
return r;
},
};
}
}