diff --git a/directory-tailer.ts b/directory-tailer.ts index e577bce..24b6081 100644 --- a/directory-tailer.ts +++ b/directory-tailer.ts @@ -15,6 +15,8 @@ const sleep = (timeout: number) => new Promise((r) => setTimeout(r, timeout)); export class DirectoryTailer { public abort = new AbortController(); + lastBatchCIDs = new Set(); + latestDate: string | undefined; saveRaw: boolean = true; // set to false in production so you don't double-store plc data @@ -72,13 +74,19 @@ export class DirectoryTailer { let entry: ExportEntry | undefined; const promises = []; + const cids = new Set(); for (const line of new IterLines(text)) { entry = JSON.parse(line) as unknown as ExportEntry; + if (this.lastBatchCIDs.has(entry.cid)) continue; + this.latestDate = entry.createdAt; + cids.add(entry.cid); promises.push(this.processRecord(entry, line)); } await Promise.all(promises); + this.lastBatchCIDs = cids; + if (entry) { this.latestDate = entry.createdAt; const write = Deno.writeTextFile("./data/latest-date", this.latestDate); diff --git a/scrape.ts b/scrape.ts index bdcb85b..59e7187 100644 --- a/scrape.ts +++ b/scrape.ts @@ -17,14 +17,15 @@ export const catchUp = async () => { lineReader.releaseLock(); } - let lastLine: string | undefined; + tailer.lastBatchCIDs.clear(); for await (const line of lineStream.values()) { - lastLine = line; - } - - if (lastLine) { - const entry = JSON.parse(lastLine) as unknown as ExportEntry; - tailer.latestDate = entry.createdAt; + try { + const entry = JSON.parse(line) as unknown as ExportEntry; + tailer.latestDate = entry.createdAt; + tailer.lastBatchCIDs.add(entry.cid); + } catch (_err) { + // ignore + } } };