From f42f44391f7240da3990b465cd999b8734e923bd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Nov 2017 09:51:12 +0000 Subject: [PATCH] Fix /sync when we have no events (#341) * Fix /sync when we have no events We used a since token of 0 to mean that no token was given. However, naffka streams start at 0. This causes clients to get stuck spinning forever until an event is sent. This changes it so that we pass around pointers instead, with nil meaning a since token wasn't given. * Comment * Fix unit tests * Comments * Fix typo --- .../dendrite/syncapi/sync/notifier_test.go | 4 ++-- .../matrix-org/dendrite/syncapi/sync/request.go | 13 ++++++++----- .../matrix-org/dendrite/syncapi/sync/requestpool.go | 12 ++++++------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 6ee25968..79c5a287 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -264,7 +264,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { return types.StreamPosition(0), fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, ) - case <-listener.GetNotifyChannel(req.since): + case <-listener.GetNotifyChannel(*req.since): p := listener.GetStreamPosition() return p, nil } @@ -282,7 +282,7 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { return syncRequest{ userID: userID, timeout: 1 * time.Minute, - since: since, + since: &since, wantFullState: false, limit: defaultTimelineLimit, log: util.GetLogger(context.TODO()), diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 7dec55fe..7f525981 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -34,7 +34,7 @@ type syncRequest struct { userID string limit int timeout time.Duration - since types.StreamPosition + since *types.StreamPosition // nil means that no since token was supplied wantFullState bool log *log.Entry } @@ -70,13 +70,16 @@ func getTimeout(timeoutMS string) time.Duration { return time.Duration(i) * time.Millisecond } -func getSyncStreamPosition(since string) (types.StreamPosition, error) { +// getSyncStreamPosition tries to parse a 'since' token taken from the API to a +// stream position. If the string is empty then (nil, nil) is returned. +func getSyncStreamPosition(since string) (*types.StreamPosition, error) { if since == "" { - return types.StreamPosition(0), nil + return nil, nil } i, err := strconv.Atoi(since) if err != nil { - return types.StreamPosition(0), err + return nil, err } - return types.StreamPosition(i), nil + token := types.StreamPosition(i) + return &token, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 3b677561..15993b77 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -64,7 +64,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos := rp.notifier.CurrentPosition() // If this is an initial sync or timeout=0 we return immediately - if syncReq.since == types.StreamPosition(0) || syncReq.timeout == 0 { + if syncReq.since == nil || syncReq.timeout == 0 { syncData, err := rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) @@ -93,7 +93,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype case <-timer.C: return util.JSONResponse{ Code: 200, - JSON: types.NewResponse(syncReq.since), + JSON: types.NewResponse(currPos), } // Or for the request to be cancelled case <-req.Context().Done(): @@ -121,10 +121,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { // TODO: handle ignored users - if req.since == types.StreamPosition(0) { + if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.userID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.userID, req.since, currentPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.userID, *req.since, currentPos, req.limit) } if err != nil { @@ -148,7 +148,7 @@ func (rp *RequestPool) appendAccountData( return nil, err } - if req.since == types.StreamPosition(0) { + if req.since == nil { // If this is the initial sync, we don't need to check if a data has // already been sent. Instead, we send the whole batch. var global []gomatrixserverlib.ClientEvent @@ -170,7 +170,7 @@ func (rp *RequestPool) appendAccountData( } // Sync is not initial, get all account data since the latest sync - dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, req.since, currentPos) + dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos) if err != nil { return nil, err }