Add support for logs in StreamingToken (#1229)
* Add support for logs in StreamingToken Tokens now end up looking like `s11_22|dl-0-123|ab-0-12224` where `dl` and `ab` are log names, `0` is the partition and `123` and `12224` are the offsets. * Also test reserialisation * s/|/./g so tokens url escape nicelymain
parent
9a5fb489c5
commit
0fdd4f14d1
|
@ -91,7 +91,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
||||||
}).Panicf("could not save account data")
|
}).Panicf("could not save account data")
|
||||||
}
|
}
|
||||||
|
|
||||||
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0))
|
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.NewStreamToken(pduPos, 0, nil))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
|
||||||
s.notifier.OnNewSendToDevice(
|
s.notifier.OnNewSendToDevice(
|
||||||
output.UserID,
|
output.UserID,
|
||||||
[]string{output.DeviceID},
|
[]string{output.DeviceID},
|
||||||
types.NewStreamToken(0, streamPos),
|
types.NewStreamToken(0, streamPos, nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -65,7 +65,7 @@ func (s *OutputTypingEventConsumer) Start() error {
|
||||||
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
|
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
|
||||||
s.notifier.OnNewEvent(
|
s.notifier.OnNewEvent(
|
||||||
nil, roomID, nil,
|
nil, roomID, nil,
|
||||||
types.NewStreamToken(0, types.StreamPosition(latestSyncPosition)),
|
types.NewStreamToken(0, types.StreamPosition(latestSyncPosition), nil),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -94,6 +94,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
||||||
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
|
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos))
|
s.notifier.OnNewEvent(nil, output.Event.RoomID, nil, types.NewStreamToken(0, typingPos, nil))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
syncingUser = "@alice:localhost"
|
syncingUser = "@alice:localhost"
|
||||||
|
emptyToken = types.NewStreamToken(0, 0, nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockKeyAPI struct{}
|
type mockKeyAPI struct{}
|
||||||
|
@ -167,7 +168,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
||||||
|
|
||||||
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
|
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -190,7 +191,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
||||||
|
|
||||||
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
|
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -213,7 +214,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
||||||
|
|
||||||
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
|
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -235,7 +236,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
||||||
|
|
||||||
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
|
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -294,7 +295,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
|
||||||
jr.Timeline.Events = roomTimelineEvents
|
jr.Timeline.Events = roomTimelineEvents
|
||||||
syncResponse.Rooms.Join[roomID] = jr
|
syncResponse.Rooms.Join[roomID] = jr
|
||||||
|
|
||||||
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
|
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -322,7 +323,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
|
||||||
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
||||||
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
||||||
|
|
||||||
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
|
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -407,7 +408,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
|
||||||
lr.Timeline.Events = roomEvents
|
lr.Timeline.Events = roomEvents
|
||||||
syncResponse.Rooms.Leave[roomID] = lr
|
syncResponse.Rooms.Leave[roomID] = lr
|
||||||
|
|
||||||
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
|
_, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,7 +158,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
}).Panicf("roomserver output log: write event failure")
|
}).Panicf("roomserver output log: write event failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0))
|
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
||||||
}).Panicf("roomserver output log: write invite failure")
|
}).Panicf("roomserver output log: write invite failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.notifier.OnNewEvent(&msg.Event, "", nil, types.NewStreamToken(pduPos, 0))
|
s.notifier.OnNewEvent(&msg.Event, "", nil, types.NewStreamToken(pduPos, 0, nil))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
}
|
}
|
||||||
// Notify any active sync requests that the invite has been retired.
|
// Notify any active sync requests that the invite has been retired.
|
||||||
// Invites share the same stream counter as PDUs
|
// Invites share the same stream counter as PDUs
|
||||||
s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0))
|
s.notifier.OnNewEvent(nil, "", []string{msg.TargetUserID}, types.NewStreamToken(sp, 0, nil))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -434,7 +434,7 @@ func (d *Database) syncPositionTx(
|
||||||
if maxInviteID > maxEventID {
|
if maxInviteID > maxEventID {
|
||||||
maxEventID = maxInviteID
|
maxEventID = maxInviteID
|
||||||
}
|
}
|
||||||
sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()))
|
sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -731,7 +731,7 @@ func (d *Database) CompleteSync(
|
||||||
|
|
||||||
// Use a zero value SyncPosition for fromPos so all EDU states are added.
|
// Use a zero value SyncPosition for fromPos so all EDU states are added.
|
||||||
err = d.addEDUDeltaToResponse(
|
err = d.addEDUDeltaToResponse(
|
||||||
types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
|
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -163,7 +163,7 @@ func TestSyncResponse(t *testing.T) {
|
||||||
Name: "IncrementalSync penultimate",
|
Name: "IncrementalSync penultimate",
|
||||||
DoSync: func() (*types.Response, error) {
|
DoSync: func() (*types.Response, error) {
|
||||||
from := types.NewStreamToken( // pretend we are at the penultimate event
|
from := types.NewStreamToken( // pretend we are at the penultimate event
|
||||||
positions[len(positions)-2], types.StreamPosition(0),
|
positions[len(positions)-2], types.StreamPosition(0), nil,
|
||||||
)
|
)
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
|
return db.IncrementalSync(ctx, res, testUserDeviceA, from, latest, 5, false)
|
||||||
|
@ -176,7 +176,7 @@ func TestSyncResponse(t *testing.T) {
|
||||||
Name: "IncrementalSync limited",
|
Name: "IncrementalSync limited",
|
||||||
DoSync: func() (*types.Response, error) {
|
DoSync: func() (*types.Response, error) {
|
||||||
from := types.NewStreamToken( // pretend we are 10 events behind
|
from := types.NewStreamToken( // pretend we are 10 events behind
|
||||||
positions[len(positions)-11], types.StreamPosition(0),
|
positions[len(positions)-11], types.StreamPosition(0), nil,
|
||||||
)
|
)
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
// limit is set to 5
|
// limit is set to 5
|
||||||
|
@ -219,7 +219,7 @@ func TestSyncResponse(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
st.Fatalf("failed to do sync: %s", err)
|
st.Fatalf("failed to do sync: %s", err)
|
||||||
}
|
}
|
||||||
next := types.NewStreamToken(latest.PDUPosition(), latest.EDUPosition())
|
next := types.NewStreamToken(latest.PDUPosition(), latest.EDUPosition(), nil)
|
||||||
if res.NextBatch != next.String() {
|
if res.NextBatch != next.String() {
|
||||||
st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String())
|
st.Errorf("NextBatch got %s want %s", res.NextBatch, next.String())
|
||||||
}
|
}
|
||||||
|
@ -243,7 +243,7 @@ func TestGetEventsInRangeWithPrevBatch(t *testing.T) {
|
||||||
t.Fatalf("failed to get SyncPosition: %s", err)
|
t.Fatalf("failed to get SyncPosition: %s", err)
|
||||||
}
|
}
|
||||||
from := types.NewStreamToken(
|
from := types.NewStreamToken(
|
||||||
positions[len(positions)-2], types.StreamPosition(0),
|
positions[len(positions)-2], types.StreamPosition(0), nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
|
@ -288,7 +288,7 @@ func TestGetEventsInRangeWithStreamToken(t *testing.T) {
|
||||||
t.Fatalf("failed to get SyncPosition: %s", err)
|
t.Fatalf("failed to get SyncPosition: %s", err)
|
||||||
}
|
}
|
||||||
// head towards the beginning of time
|
// head towards the beginning of time
|
||||||
to := types.NewStreamToken(0, 0)
|
to := types.NewStreamToken(0, 0, nil)
|
||||||
|
|
||||||
// backpaginate 5 messages starting at the latest position.
|
// backpaginate 5 messages starting at the latest position.
|
||||||
paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true)
|
paginatedEvents, err := db.GetEventsInStreamingRange(ctx, &latest, &to, testRoomID, 5, true)
|
||||||
|
@ -531,14 +531,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
|
||||||
|
|
||||||
// At this point there should be no messages. We haven't sent anything
|
// At this point there should be no messages. We haven't sent anything
|
||||||
// yet.
|
// yet.
|
||||||
events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0))
|
events, updates, deletions, err := db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, 0, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 {
|
if len(events) != 0 || len(updates) != 0 || len(deletions) != 0 {
|
||||||
t.Fatal("first call should have no updates")
|
t.Fatal("first call should have no updates")
|
||||||
}
|
}
|
||||||
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0))
|
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, 0, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -556,14 +556,14 @@ func TestSendToDeviceBehaviour(t *testing.T) {
|
||||||
// At this point we should get exactly one message. We're sending the sync position
|
// At this point we should get exactly one message. We're sending the sync position
|
||||||
// that we were given from the update and the send-to-device update will be updated
|
// that we were given from the update and the send-to-device update will be updated
|
||||||
// in the database to reflect that this was the sync position we sent the message at.
|
// in the database to reflect that this was the sync position we sent the message at.
|
||||||
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos))
|
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 {
|
if len(events) != 1 || len(updates) != 1 || len(deletions) != 0 {
|
||||||
t.Fatal("second call should have one update")
|
t.Fatal("second call should have one update")
|
||||||
}
|
}
|
||||||
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos))
|
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -571,35 +571,35 @@ func TestSendToDeviceBehaviour(t *testing.T) {
|
||||||
// At this point we should still have one message because we haven't progressed the
|
// At this point we should still have one message because we haven't progressed the
|
||||||
// sync position yet. This is equivalent to the client failing to /sync and retrying
|
// sync position yet. This is equivalent to the client failing to /sync and retrying
|
||||||
// with the same position.
|
// with the same position.
|
||||||
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos))
|
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 {
|
if len(events) != 1 || len(updates) != 0 || len(deletions) != 0 {
|
||||||
t.Fatal("third call should have one update still")
|
t.Fatal("third call should have one update still")
|
||||||
}
|
}
|
||||||
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos))
|
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// At this point we should now have no updates, because we've progressed the sync
|
// At this point we should now have no updates, because we've progressed the sync
|
||||||
// position. Therefore the update from before will not be sent again.
|
// position. Therefore the update from before will not be sent again.
|
||||||
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+1))
|
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+1, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 {
|
if len(events) != 0 || len(updates) != 0 || len(deletions) != 1 {
|
||||||
t.Fatal("fourth call should have no updates")
|
t.Fatal("fourth call should have no updates")
|
||||||
}
|
}
|
||||||
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos+1))
|
err = db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, types.NewStreamToken(0, streamPos+1, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// At this point we should still have no updates, because no new updates have been
|
// At this point we should still have no updates, because no new updates have been
|
||||||
// sent.
|
// sent.
|
||||||
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+2))
|
events, updates, deletions, err = db.SendToDeviceUpdatesForSync(ctx, "alice", "one", types.NewStreamToken(0, streamPos+2, nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -636,7 +636,7 @@ func TestInviteBehaviour(t *testing.T) {
|
||||||
}
|
}
|
||||||
// both invite events should appear in a new sync
|
// both invite events should appear in a new sync
|
||||||
beforeRetireRes := types.NewResponse()
|
beforeRetireRes := types.NewResponse()
|
||||||
beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false)
|
beforeRetireRes, err = db.IncrementalSync(ctx, beforeRetireRes, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("IncrementalSync failed: %s", err)
|
t.Fatalf("IncrementalSync failed: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -651,7 +651,7 @@ func TestInviteBehaviour(t *testing.T) {
|
||||||
t.Fatalf("failed to get SyncPosition: %s", err)
|
t.Fatalf("failed to get SyncPosition: %s", err)
|
||||||
}
|
}
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0), latest, 0, false)
|
res, err = db.IncrementalSync(ctx, res, testUserDeviceA, types.NewStreamToken(0, 0, nil), latest, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("IncrementalSync failed: %s", err)
|
t.Fatalf("IncrementalSync failed: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,11 +32,11 @@ var (
|
||||||
randomMessageEvent gomatrixserverlib.HeaderedEvent
|
randomMessageEvent gomatrixserverlib.HeaderedEvent
|
||||||
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
|
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
|
||||||
bobLeaveEvent gomatrixserverlib.HeaderedEvent
|
bobLeaveEvent gomatrixserverlib.HeaderedEvent
|
||||||
syncPositionVeryOld = types.NewStreamToken(5, 0)
|
syncPositionVeryOld = types.NewStreamToken(5, 0, nil)
|
||||||
syncPositionBefore = types.NewStreamToken(11, 0)
|
syncPositionBefore = types.NewStreamToken(11, 0, nil)
|
||||||
syncPositionAfter = types.NewStreamToken(12, 0)
|
syncPositionAfter = types.NewStreamToken(12, 0, nil)
|
||||||
syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition(), 1)
|
syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition(), 1, nil)
|
||||||
syncPositionAfter2 = types.NewStreamToken(13, 0)
|
syncPositionAfter2 = types.NewStreamToken(13, 0, nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -65,7 +65,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
since = &tok
|
since = &tok
|
||||||
}
|
}
|
||||||
if since == nil {
|
if since == nil {
|
||||||
tok := types.NewStreamToken(0, 0)
|
tok := types.NewStreamToken(0, 0, nil)
|
||||||
since = &tok
|
since = &tok
|
||||||
}
|
}
|
||||||
timelineLimit := DefaultTimelineLimit
|
timelineLimit := DefaultTimelineLimit
|
||||||
|
|
|
@ -138,7 +138,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (res *types.Response, err error) {
|
||||||
res = types.NewResponse()
|
res = types.NewResponse()
|
||||||
|
|
||||||
since := types.NewStreamToken(0, 0)
|
since := types.NewStreamToken(0, 0, nil)
|
||||||
if req.since != nil {
|
if req.since != nil {
|
||||||
since = *req.since
|
since = *req.since
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,23 @@ var (
|
||||||
// StreamPosition represents the offset in the sync stream a client is at.
|
// StreamPosition represents the offset in the sync stream a client is at.
|
||||||
type StreamPosition int64
|
type StreamPosition int64
|
||||||
|
|
||||||
|
// LogPosition represents the offset in a Kafka log a client is at.
|
||||||
|
type LogPosition struct {
|
||||||
|
Partition int32
|
||||||
|
Offset int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsAfter returns true if this position is after `lp`.
|
||||||
|
func (p *LogPosition) IsAfter(lp *LogPosition) bool {
|
||||||
|
if lp == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if p.Partition != lp.Partition {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return p.Offset > lp.Offset
|
||||||
|
}
|
||||||
|
|
||||||
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
|
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
|
||||||
type StreamEvent struct {
|
type StreamEvent struct {
|
||||||
gomatrixserverlib.HeaderedEvent
|
gomatrixserverlib.HeaderedEvent
|
||||||
|
@ -90,6 +107,15 @@ const (
|
||||||
|
|
||||||
type StreamingToken struct {
|
type StreamingToken struct {
|
||||||
syncToken
|
syncToken
|
||||||
|
logs map[string]*LogPosition
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *StreamingToken) Log(name string) *LogPosition {
|
||||||
|
l, ok := t.logs[name]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *StreamingToken) PDUPosition() StreamPosition {
|
func (t *StreamingToken) PDUPosition() StreamPosition {
|
||||||
|
@ -99,7 +125,15 @@ func (t *StreamingToken) EDUPosition() StreamPosition {
|
||||||
return t.Positions[1]
|
return t.Positions[1]
|
||||||
}
|
}
|
||||||
func (t *StreamingToken) String() string {
|
func (t *StreamingToken) String() string {
|
||||||
return t.syncToken.String()
|
logStrings := []string{
|
||||||
|
t.syncToken.String(),
|
||||||
|
}
|
||||||
|
for name, lp := range t.logs {
|
||||||
|
logStr := fmt.Sprintf("%s-%d-%d", name, lp.Partition, lp.Offset)
|
||||||
|
logStrings = append(logStrings, logStr)
|
||||||
|
}
|
||||||
|
// E.g s11_22_33.dl0-134.ab1-441
|
||||||
|
return strings.Join(logStrings, ".")
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsAfter returns true if ANY position in this token is greater than `other`.
|
// IsAfter returns true if ANY position in this token is greater than `other`.
|
||||||
|
@ -109,12 +143,22 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for name := range t.logs {
|
||||||
|
otherLog := other.Log(name)
|
||||||
|
if otherLog == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if t.logs[name].IsAfter(otherLog) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
|
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
|
||||||
// If the latter StreamingToken contains a field that is not 0, it is considered an update,
|
// If the latter StreamingToken contains a field that is not 0, it is considered an update,
|
||||||
// and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called.
|
// and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called.
|
||||||
|
// If the other token has a log, they will replace any existing log on this token.
|
||||||
func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) {
|
func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) {
|
||||||
ret.Type = t.Type
|
ret.Type = t.Type
|
||||||
ret.Positions = make([]StreamPosition, len(t.Positions))
|
ret.Positions = make([]StreamPosition, len(t.Positions))
|
||||||
|
@ -125,6 +169,13 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken)
|
||||||
}
|
}
|
||||||
ret.Positions[i] = other.Positions[i]
|
ret.Positions[i] = other.Positions[i]
|
||||||
}
|
}
|
||||||
|
for name := range t.logs {
|
||||||
|
otherLog := other.Log(name)
|
||||||
|
if otherLog == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.logs[name] = otherLog
|
||||||
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,7 +190,7 @@ func (t *TopologyToken) PDUPosition() StreamPosition {
|
||||||
return t.Positions[1]
|
return t.Positions[1]
|
||||||
}
|
}
|
||||||
func (t *TopologyToken) StreamToken() StreamingToken {
|
func (t *TopologyToken) StreamToken() StreamingToken {
|
||||||
return NewStreamToken(t.PDUPosition(), 0)
|
return NewStreamToken(t.PDUPosition(), 0, nil)
|
||||||
}
|
}
|
||||||
func (t *TopologyToken) String() string {
|
func (t *TopologyToken) String() string {
|
||||||
return t.syncToken.String()
|
return t.syncToken.String()
|
||||||
|
@ -174,9 +225,9 @@ func (t *TopologyToken) Decrement() {
|
||||||
// error if the token couldn't be parsed into an int64, or if the token type
|
// error if the token couldn't be parsed into an int64, or if the token type
|
||||||
// isn't a known type (returns ErrInvalidSyncTokenType in the latter
|
// isn't a known type (returns ErrInvalidSyncTokenType in the latter
|
||||||
// case).
|
// case).
|
||||||
func newSyncTokenFromString(s string) (token *syncToken, err error) {
|
func newSyncTokenFromString(s string) (token *syncToken, categories []string, err error) {
|
||||||
if len(s) == 0 {
|
if len(s) == 0 {
|
||||||
return nil, ErrInvalidSyncTokenLen
|
return nil, nil, ErrInvalidSyncTokenLen
|
||||||
}
|
}
|
||||||
|
|
||||||
token = new(syncToken)
|
token = new(syncToken)
|
||||||
|
@ -185,16 +236,17 @@ func newSyncTokenFromString(s string) (token *syncToken, err error) {
|
||||||
switch t := SyncTokenType(s[:1]); t {
|
switch t := SyncTokenType(s[:1]); t {
|
||||||
case SyncTokenTypeStream, SyncTokenTypeTopology:
|
case SyncTokenTypeStream, SyncTokenTypeTopology:
|
||||||
token.Type = t
|
token.Type = t
|
||||||
positions = strings.Split(s[1:], "_")
|
categories = strings.Split(s[1:], ".")
|
||||||
|
positions = strings.Split(categories[0], "_")
|
||||||
default:
|
default:
|
||||||
return nil, ErrInvalidSyncTokenType
|
return nil, nil, ErrInvalidSyncTokenType
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, pos := range positions {
|
for _, pos := range positions {
|
||||||
if posInt, err := strconv.ParseInt(pos, 10, 64); err != nil {
|
if posInt, err := strconv.ParseInt(pos, 10, 64); err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
} else if posInt < 0 {
|
} else if posInt < 0 {
|
||||||
return nil, errors.New("negative position not allowed")
|
return nil, nil, errors.New("negative position not allowed")
|
||||||
} else {
|
} else {
|
||||||
token.Positions = append(token.Positions, StreamPosition(posInt))
|
token.Positions = append(token.Positions, StreamPosition(posInt))
|
||||||
}
|
}
|
||||||
|
@ -215,7 +267,7 @@ func NewTopologyToken(depth, streamPos StreamPosition) TopologyToken {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
|
func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
|
||||||
t, err := newSyncTokenFromString(tok)
|
t, _, err := newSyncTokenFromString(tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -233,16 +285,20 @@ func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStreamToken creates a new sync token for /sync
|
// NewStreamToken creates a new sync token for /sync
|
||||||
func NewStreamToken(pduPos, eduPos StreamPosition) StreamingToken {
|
func NewStreamToken(pduPos, eduPos StreamPosition, logs map[string]*LogPosition) StreamingToken {
|
||||||
|
if logs == nil {
|
||||||
|
logs = make(map[string]*LogPosition)
|
||||||
|
}
|
||||||
return StreamingToken{
|
return StreamingToken{
|
||||||
syncToken: syncToken{
|
syncToken: syncToken{
|
||||||
Type: SyncTokenTypeStream,
|
Type: SyncTokenTypeStream,
|
||||||
Positions: []StreamPosition{pduPos, eduPos},
|
Positions: []StreamPosition{pduPos, eduPos},
|
||||||
},
|
},
|
||||||
|
logs: logs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
|
func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
|
||||||
t, err := newSyncTokenFromString(tok)
|
t, categories, err := newSyncTokenFromString(tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -254,8 +310,35 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
|
||||||
err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions))
|
err = fmt.Errorf("token %s wrong number of values, got %d want at least 2", tok, len(t.Positions))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
logs := make(map[string]*LogPosition)
|
||||||
|
if len(categories) > 1 {
|
||||||
|
// dl-0-1234
|
||||||
|
// $log_name-$partition-$offset
|
||||||
|
for _, logStr := range categories[1:] {
|
||||||
|
segments := strings.Split(logStr, "-")
|
||||||
|
if len(segments) != 3 {
|
||||||
|
err = fmt.Errorf("token %s - invalid log: %s", tok, logStr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var partition int64
|
||||||
|
partition, err = strconv.ParseInt(segments[1], 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var offset int64
|
||||||
|
offset, err = strconv.ParseInt(segments[2], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logs[segments[0]] = &LogPosition{
|
||||||
|
Partition: int32(partition),
|
||||||
|
Offset: offset,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return StreamingToken{
|
return StreamingToken{
|
||||||
syncToken: *t,
|
syncToken: *t,
|
||||||
|
logs: logs,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,61 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
import "testing"
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewSyncTokenWithLogs(t *testing.T) {
|
||||||
|
tests := map[string]*StreamingToken{
|
||||||
|
"s4_0": &StreamingToken{
|
||||||
|
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
|
||||||
|
logs: make(map[string]*LogPosition),
|
||||||
|
},
|
||||||
|
"s4_0.dl-0-123": &StreamingToken{
|
||||||
|
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
|
||||||
|
logs: map[string]*LogPosition{
|
||||||
|
"dl": &LogPosition{
|
||||||
|
Partition: 0,
|
||||||
|
Offset: 123,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"s4_0.dl-0-123.ab-1-14419482332": &StreamingToken{
|
||||||
|
syncToken: syncToken{Type: "s", Positions: []StreamPosition{4, 0}},
|
||||||
|
logs: map[string]*LogPosition{
|
||||||
|
"ab": &LogPosition{
|
||||||
|
Partition: 1,
|
||||||
|
Offset: 14419482332,
|
||||||
|
},
|
||||||
|
"dl": &LogPosition{
|
||||||
|
Partition: 0,
|
||||||
|
Offset: 123,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for tok, want := range tests {
|
||||||
|
got, err := NewStreamTokenFromString(tok)
|
||||||
|
if err != nil {
|
||||||
|
if want == nil {
|
||||||
|
continue // error expected
|
||||||
|
}
|
||||||
|
t.Errorf("%s errored: %s", tok, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, *want) {
|
||||||
|
t.Errorf("%s mismatch: got %v want %v", tok, got, want)
|
||||||
|
}
|
||||||
|
if got.String() != tok {
|
||||||
|
t.Errorf("%s reserialisation mismatch: got %s want %s", tok, got.String(), tok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestNewSyncTokenFromString(t *testing.T) {
|
func TestNewSyncTokenFromString(t *testing.T) {
|
||||||
shouldPass := map[string]syncToken{
|
shouldPass := map[string]syncToken{
|
||||||
"s4_0": NewStreamToken(4, 0).syncToken,
|
"s4_0": NewStreamToken(4, 0, nil).syncToken,
|
||||||
"s3_1": NewStreamToken(3, 1).syncToken,
|
"s3_1": NewStreamToken(3, 1, nil).syncToken,
|
||||||
"t3_1": NewTopologyToken(3, 1).syncToken,
|
"t3_1": NewTopologyToken(3, 1).syncToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,7 +71,7 @@ func TestNewSyncTokenFromString(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for test, expected := range shouldPass {
|
for test, expected := range shouldPass {
|
||||||
result, err := newSyncTokenFromString(test)
|
result, _, err := newSyncTokenFromString(test)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -31,7 +81,7 @@ func TestNewSyncTokenFromString(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range shouldFail {
|
for _, test := range shouldFail {
|
||||||
if _, err := newSyncTokenFromString(test); err == nil {
|
if _, _, err := newSyncTokenFromString(test); err == nil {
|
||||||
t.Errorf("input '%v' should have errored but didn't", test)
|
t.Errorf("input '%v' should have errored but didn't", test)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue