filter duplicate cids
This commit is contained in:
parent
e9af92c00f
commit
0d2412813f
2 changed files with 16 additions and 7 deletions
|
|
@ -15,6 +15,8 @@ const sleep = (timeout: number) => new Promise((r) => setTimeout(r, timeout));
|
||||||
export class DirectoryTailer {
|
export class DirectoryTailer {
|
||||||
public abort = new AbortController();
|
public abort = new AbortController();
|
||||||
|
|
||||||
|
lastBatchCIDs = new Set<string>();
|
||||||
|
|
||||||
latestDate: string | undefined;
|
latestDate: string | undefined;
|
||||||
saveRaw: boolean = true; // set to false in production so you don't double-store plc data
|
saveRaw: boolean = true; // set to false in production so you don't double-store plc data
|
||||||
|
|
||||||
|
|
@ -72,13 +74,19 @@ export class DirectoryTailer {
|
||||||
|
|
||||||
let entry: ExportEntry | undefined;
|
let entry: ExportEntry | undefined;
|
||||||
const promises = [];
|
const promises = [];
|
||||||
|
const cids = new Set<string>();
|
||||||
for (const line of new IterLines(text)) {
|
for (const line of new IterLines(text)) {
|
||||||
entry = JSON.parse(line) as unknown as ExportEntry;
|
entry = JSON.parse(line) as unknown as ExportEntry;
|
||||||
|
if (this.lastBatchCIDs.has(entry.cid)) continue;
|
||||||
|
|
||||||
this.latestDate = entry.createdAt;
|
this.latestDate = entry.createdAt;
|
||||||
|
cids.add(entry.cid);
|
||||||
promises.push(this.processRecord(entry, line));
|
promises.push(this.processRecord(entry, line));
|
||||||
}
|
}
|
||||||
await Promise.all(promises);
|
await Promise.all(promises);
|
||||||
|
|
||||||
|
this.lastBatchCIDs = cids;
|
||||||
|
|
||||||
if (entry) {
|
if (entry) {
|
||||||
this.latestDate = entry.createdAt;
|
this.latestDate = entry.createdAt;
|
||||||
const write = Deno.writeTextFile("./data/latest-date", this.latestDate);
|
const write = Deno.writeTextFile("./data/latest-date", this.latestDate);
|
||||||
|
|
|
||||||
15
scrape.ts
15
scrape.ts
|
|
@ -17,14 +17,15 @@ export const catchUp = async () => {
|
||||||
lineReader.releaseLock();
|
lineReader.releaseLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
let lastLine: string | undefined;
|
tailer.lastBatchCIDs.clear();
|
||||||
for await (const line of lineStream.values()) {
|
for await (const line of lineStream.values()) {
|
||||||
lastLine = line;
|
try {
|
||||||
}
|
const entry = JSON.parse(line) as unknown as ExportEntry;
|
||||||
|
tailer.latestDate = entry.createdAt;
|
||||||
if (lastLine) {
|
tailer.lastBatchCIDs.add(entry.cid);
|
||||||
const entry = JSON.parse(lastLine) as unknown as ExportEntry;
|
} catch (_err) {
|
||||||
tailer.latestDate = entry.createdAt;
|
// ignore
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue