compact wire protocol
This commit is contained in:
parent
76541aea85
commit
90ab38c989
4 changed files with 31 additions and 18 deletions
|
@ -1,4 +1,5 @@
|
||||||
import diff from "npm:fast-diff";
|
import diff from "npm:fast-diff";
|
||||||
|
import { decompressOps } from "../compression.ts";
|
||||||
import { Packet } from "../proto.ts";
|
import { Packet } from "../proto.ts";
|
||||||
import { CausalTree, CausalTreeOp } from "../sync/ordt/causal-tree.ts";
|
import { CausalTree, CausalTreeOp } from "../sync/ordt/causal-tree.ts";
|
||||||
import { PlainTextOperation, PlainTextORDT } from "../sync/ordt/plain-text.ts";
|
import { PlainTextOperation, PlainTextORDT } from "../sync/ordt/plain-text.ts";
|
||||||
|
@ -31,7 +32,8 @@ const initialized = Promise.withResolvers<Packet & { t: "init" }>();
|
||||||
socket.addEventListener("message", ev => {
|
socket.addEventListener("message", ev => {
|
||||||
const packet = JSON.parse(ev.data) as Packet;
|
const packet = JSON.parse(ev.data) as Packet;
|
||||||
if (packet.t === "init") {
|
if (packet.t === "init") {
|
||||||
for (const op of packet.ops) pt.apply(op);
|
const ops = decompressOps(packet.ops);
|
||||||
|
for (const op of ops) pt.apply(op);
|
||||||
initialized.resolve(packet);
|
initialized.resolve(packet);
|
||||||
} else if (packet.t === "op") {
|
} else if (packet.t === "op") {
|
||||||
pt.apply(packet.op);
|
pt.apply(packet.op);
|
||||||
|
|
|
@ -2,14 +2,15 @@ import { Timestamp } from "./sync/common.ts";
|
||||||
import { CausalTree, CausalTreeOp, WeakCausalTreeOp } from "./sync/ordt/causal-tree.ts";
|
import { CausalTree, CausalTreeOp, WeakCausalTreeOp } from "./sync/ordt/causal-tree.ts";
|
||||||
import { PlainTextOperation } from "./sync/ordt/plain-text.ts";
|
import { PlainTextOperation } from "./sync/ordt/plain-text.ts";
|
||||||
|
|
||||||
type VerbatimRun = WeakCausalTreeOp<PlainTextOperation>;
|
type VerbatimOp = WeakCausalTreeOp<PlainTextOperation>;
|
||||||
type CompressedRun = { st: Timestamp; p: Timestamp | undefined; seq: string };
|
type CompressedRun = { st: Timestamp; p: Timestamp | undefined; seq: string };
|
||||||
type Run = VerbatimRun | CompressedRun;
|
|
||||||
|
|
||||||
export function compressOps(ops: CausalTreeOp<PlainTextOperation>[]): Run[] {
|
export type CompactedOperations = (VerbatimOp | CompressedRun)[];
|
||||||
if (ops.length === 0) throw new Error("can't create a compressed run!");
|
|
||||||
|
|
||||||
const runs: Run[] = [];
|
export function compressOps(ops: CausalTreeOp<PlainTextOperation>[]): CompactedOperations {
|
||||||
|
if (ops.length === 0) return [];
|
||||||
|
|
||||||
|
const runs: CompactedOperations = [];
|
||||||
let seq = "";
|
let seq = "";
|
||||||
let cnt = 0;
|
let cnt = 0;
|
||||||
let start: Timestamp | undefined;
|
let start: Timestamp | undefined;
|
||||||
|
@ -18,8 +19,10 @@ export function compressOps(ops: CausalTreeOp<PlainTextOperation>[]): Run[] {
|
||||||
const op = ops[i];
|
const op = ops[i];
|
||||||
const lastOp = ops[i - 1];
|
const lastOp = ops[i - 1];
|
||||||
|
|
||||||
if (!start) start = op.at;
|
if (!start) {
|
||||||
if (!firstParent) firstParent = op.parent?.at;
|
start = op.at;
|
||||||
|
firstParent = op.parent?.at;
|
||||||
|
}
|
||||||
|
|
||||||
if (
|
if (
|
||||||
op.type === "insert" &&
|
op.type === "insert" &&
|
||||||
|
@ -54,16 +57,18 @@ export function compressOps(ops: CausalTreeOp<PlainTextOperation>[]): Run[] {
|
||||||
return runs;
|
return runs;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function decompressOps(runs: Run[]): WeakCausalTreeOp<PlainTextOperation>[] {
|
export function decompressOps(
|
||||||
|
compacted: CompactedOperations,
|
||||||
|
): WeakCausalTreeOp<PlainTextOperation>[] {
|
||||||
const ops: WeakCausalTreeOp<PlainTextOperation>[] = [];
|
const ops: WeakCausalTreeOp<PlainTextOperation>[] = [];
|
||||||
|
|
||||||
for (const run of runs) {
|
for (const item of compacted) {
|
||||||
if ("st" in run) {
|
if ("st" in item) {
|
||||||
let i = 0;
|
let i = 0;
|
||||||
let parent: Timestamp | undefined = run.p;
|
let parent: Timestamp | undefined = item.p;
|
||||||
for (const c of run.seq) {
|
for (const c of item.seq) {
|
||||||
const op = {
|
const op = {
|
||||||
at: [run.st[0], run.st[1] + i] as Timestamp,
|
at: [item.st[0], item.st[1] + i] as Timestamp,
|
||||||
parent,
|
parent,
|
||||||
type: "insert",
|
type: "insert",
|
||||||
sequence: c,
|
sequence: c,
|
||||||
|
@ -73,7 +78,7 @@ export function decompressOps(runs: Run[]): WeakCausalTreeOp<PlainTextOperation>
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ops.push(run);
|
ops.push(item);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
3
proto.ts
3
proto.ts
|
@ -1,6 +1,7 @@
|
||||||
|
import { CompactedOperations } from "./compression.ts";
|
||||||
import { WeakCausalTreeOp } from "./sync/ordt/causal-tree.ts";
|
import { WeakCausalTreeOp } from "./sync/ordt/causal-tree.ts";
|
||||||
import { PlainTextOperation } from "./sync/ordt/plain-text.ts";
|
import { PlainTextOperation } from "./sync/ordt/plain-text.ts";
|
||||||
|
|
||||||
export type Packet =
|
export type Packet =
|
||||||
| { t: "init"; ops: WeakCausalTreeOp<PlainTextOperation>[]; you: string }
|
| { t: "init"; ops: CompactedOperations; you: string }
|
||||||
| { t: "op"; op: WeakCausalTreeOp<PlainTextOperation> };
|
| { t: "op"; op: WeakCausalTreeOp<PlainTextOperation> };
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import { Application, Router } from "@oak/oak";
|
import { Application, Router } from "@oak/oak";
|
||||||
|
import { compressOps } from "./compression.ts";
|
||||||
import { Packet } from "./proto.ts";
|
import { Packet } from "./proto.ts";
|
||||||
import { CausalTree } from "./sync/ordt/causal-tree.ts";
|
import { CausalTree } from "./sync/ordt/causal-tree.ts";
|
||||||
import { PlainTextORDT } from "./sync/ordt/plain-text.ts";
|
import { PlainTextORDT } from "./sync/ordt/plain-text.ts";
|
||||||
|
@ -13,10 +14,14 @@ const sockets = new Set<WebSocket>();
|
||||||
router.get("/api/connect", ctx => {
|
router.get("/api/connect", ctx => {
|
||||||
const socket = ctx.upgrade();
|
const socket = ctx.upgrade();
|
||||||
sockets.add(socket);
|
sockets.add(socket);
|
||||||
|
|
||||||
|
const connectionId = crypto.randomUUID();
|
||||||
|
|
||||||
socket.addEventListener("message", event => {
|
socket.addEventListener("message", event => {
|
||||||
if (typeof event.data !== "string") return;
|
if (typeof event.data !== "string") return;
|
||||||
|
|
||||||
const op = JSON.parse(event.data);
|
const op = JSON.parse(event.data);
|
||||||
|
op.at[0] = connectionId;
|
||||||
pt.apply(op); // mutates op to strong
|
pt.apply(op); // mutates op to strong
|
||||||
const weakOp = CausalTree.toWeakOp(op);
|
const weakOp = CausalTree.toWeakOp(op);
|
||||||
|
|
||||||
|
@ -33,8 +38,8 @@ router.get("/api/connect", ctx => {
|
||||||
socket.send(
|
socket.send(
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
t: "init",
|
t: "init",
|
||||||
ops: pt.operations.map(it => CausalTree.toWeakOp(it)),
|
ops: compressOps(pt.operations),
|
||||||
you: crypto.randomUUID(),
|
you: connectionId,
|
||||||
} satisfies Packet),
|
} satisfies Packet),
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue