sync-playground/sync/ordt/historical-lww.ts

95 lines
3 KiB
TypeScript

import { basicCompare, PeerId, Primitive, Tid, Timestamp } from "../common.ts";
import { Comparator, OrderedSet } from "../ordered-set.ts";
const tombstone = Symbol("multi-lww.tombstone");
export type MultiLWWOperation<Shape extends Record<string, Primitive>> = {
timestamp: Timestamp;
key: keyof Shape;
value: Shape[keyof Shape] | typeof tombstone;
};
/** a last-write-wins key/value register with history */
export class HistoricalLWW<Shape extends Record<string, Primitive>> {
// an OrderedSet<Operation> is an ORDT
operations: OrderedSet<MultiLWWOperation<Shape>>;
// we keep some indices for performance:
lookup = new Map<keyof Shape, number>();
latest: Tid = -1;
constructor() {
type Operation = MultiLWWOperation<Shape>;
const comparator: Comparator<Operation> = (a, b) => {
const key = basicCompare(a.key, b.key);
if (key !== 0) return key;
const at = basicCompare(a.timestamp[1], b.timestamp[1]);
if (at !== 0) return at;
return -basicCompare(a.timestamp[0], b.timestamp[0]);
};
this.operations = new OrderedSet<MultiLWWOperation<Shape>>(comparator);
}
// prettier-ignore
set<K extends keyof Shape, V extends Shape[K]>(
key: K, value: V,
from: PeerId, at: Tid = this.latest + 1,
) {
this.#apply({ key, value, timestamp: [from, at] });
}
delete<K extends keyof Shape>(key: K, from: PeerId, at: Tid = this.latest + 1) {
this.#apply({ key, value: tombstone, timestamp: [from, at] });
}
get<K extends keyof Shape, V extends Shape[K]>(key: K, at?: Tid): V | undefined {
let idx = this.lookup.get(key);
if (!idx) return undefined;
let operation = this.operations.items[idx];
while (at && operation.timestamp[1] > at && idx > 0) {
idx--;
operation = this.operations.items[idx];
}
const v = operation.value;
if (v === tombstone) return undefined;
return v as V;
}
#apply(op: MultiLWWOperation<Shape>) {
const resultingIndex = this.operations.add(op);
for (const [key, index] of this.lookup.entries()) {
if (index >= resultingIndex) this.lookup.set(key, index + 1);
}
const existingIndex = this.lookup.get(op.key);
if (existingIndex === undefined || existingIndex < resultingIndex)
this.lookup.set(op.key, resultingIndex);
if (op.timestamp[1] > this.latest) this.latest = op.timestamp[1];
}
#recalculateLookup() {
const len = this.operations.length;
for (let idx = 0; idx < len; idx++) {
const operation = this.operations.items[idx];
this.lookup.set(operation.key, idx);
}
}
patch(operations: MultiLWWOperation<Shape>[]) {
for (const op of operations) this.#apply(op);
}
merge(operations: MultiLWWOperation<Shape>[]) {
this.operations.items.push(...operations);
this.operations.items.sort(this.operations.comparator);
this.#recalculateLookup();
}
compact(since?: Tid) {
this.operations.items = this.operations.items.filter(
(it, idx) => (since && it.timestamp[1] >= since) || this.lookup.get(it.key) === idx,
);
this.#recalculateLookup();
}
}