diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 6ee25968..79c5a287 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -264,7 +264,7 @@ func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { return types.StreamPosition(0), fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, ) - case <-listener.GetNotifyChannel(req.since): + case <-listener.GetNotifyChannel(*req.since): p := listener.GetStreamPosition() return p, nil } @@ -282,7 +282,7 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { return syncRequest{ userID: userID, timeout: 1 * time.Minute, - since: since, + since: &since, wantFullState: false, limit: defaultTimelineLimit, log: util.GetLogger(context.TODO()), diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 7dec55fe..7f525981 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -34,7 +34,7 @@ type syncRequest struct { userID string limit int timeout time.Duration - since types.StreamPosition + since *types.StreamPosition // nil means that no since token was supplied wantFullState bool log *log.Entry } @@ -70,13 +70,16 @@ func getTimeout(timeoutMS string) time.Duration { return time.Duration(i) * time.Millisecond } -func getSyncStreamPosition(since string) (types.StreamPosition, error) { +// getSyncStreamPosition tries to parse a 'since' token taken from the API to a +// stream position. If the string is empty then (nil, nil) is returned. +func getSyncStreamPosition(since string) (*types.StreamPosition, error) { if since == "" { - return types.StreamPosition(0), nil + return nil, nil } i, err := strconv.Atoi(since) if err != nil { - return types.StreamPosition(0), err + return nil, err } - return types.StreamPosition(i), nil + token := types.StreamPosition(i) + return &token, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 3b677561..15993b77 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -64,7 +64,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos := rp.notifier.CurrentPosition() // If this is an initial sync or timeout=0 we return immediately - if syncReq.since == types.StreamPosition(0) || syncReq.timeout == 0 { + if syncReq.since == nil || syncReq.timeout == 0 { syncData, err := rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) @@ -93,7 +93,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype case <-timer.C: return util.JSONResponse{ Code: 200, - JSON: types.NewResponse(syncReq.since), + JSON: types.NewResponse(currPos), } // Or for the request to be cancelled case <-req.Context().Done(): @@ -121,10 +121,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { // TODO: handle ignored users - if req.since == types.StreamPosition(0) { + if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.userID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.userID, req.since, currentPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.userID, *req.since, currentPos, req.limit) } if err != nil { @@ -148,7 +148,7 @@ func (rp *RequestPool) appendAccountData( return nil, err } - if req.since == types.StreamPosition(0) { + if req.since == nil { // If this is the initial sync, we don't need to check if a data has // already been sent. Instead, we send the whole batch. var global []gomatrixserverlib.ClientEvent @@ -170,7 +170,7 @@ func (rp *RequestPool) appendAccountData( } // Sync is not initial, get all account data since the latest sync - dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, req.since, currentPos) + dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos) if err != nil { return nil, err }