Fix the sync api returning an empty sync response when reaching timeout, regardless of the since token (#595)

* Fix the sync api returning an empty sync response when reaching timeout, regardless of the since token

* Declare syncData from the start to avoid declaration shadowing

* Bump Kafka version
Since this is a huge bump, it might completely blow up - I'll revert it if that's the case

* Put the loop and the IsEmpty() check back in sync handler

* Add doc and allow returning if timeout is reached
main
Brendan Abolivier 2018-12-18 17:56:08 +00:00 committed by GitHub
parent 2133e6bf59
commit 170264404e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 10 deletions

View File

@ -9,9 +9,9 @@ cd `dirname $0`/..
mkdir -p .downloads mkdir -p .downloads
# The mirror to download kafka from is picked from the list of mirrors at # The mirror to download kafka from is picked from the list of mirrors at
# https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.11.0.2.tgz # https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# TODO: Check the signature since we are downloading over HTTP. # TODO: Check the signature since we are downloading over HTTP.
MIRROR=http://apache.mirror.anlx.net/kafka/0.11.0.2/kafka_2.11-0.11.0.2.tgz MIRROR=http://apache.mirror.anlx.net/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# Only download the kafka if it isn't already downloaded. # Only download the kafka if it isn't already downloaded.
test -f .downloads/kafka.tgz || wget $MIRROR -O .downloads/kafka.tgz test -f .downloads/kafka.tgz || wget $MIRROR -O .downloads/kafka.tgz

View File

@ -45,6 +45,8 @@ func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.D
// called in a dedicated goroutine for this request. This function will block the goroutine // called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out. // until a response is ready, or it times out.
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse { func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtypes.Device) util.JSONResponse {
var syncData *types.Response
// Extract values from request // Extract values from request
logger := util.GetLogger(req.Context()) logger := util.GetLogger(req.Context())
userID := device.UserID userID := device.UserID
@ -65,7 +67,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
// If this is an initial sync or timeout=0 we return immediately // If this is an initial sync or timeout=0 we return immediately
if syncReq.since == nil || syncReq.timeout == 0 { if syncReq.since == nil || syncReq.timeout == 0 {
syncData, err := rp.currentSyncForUser(*syncReq, currPos) syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -84,6 +86,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
userStreamListener := rp.notifier.GetListener(*syncReq) userStreamListener := rp.notifier.GetListener(*syncReq)
defer userStreamListener.Close() defer userStreamListener.Close()
// We need the loop in case userStreamListener wakes up even if there isn't
// anything to send down. In this case, we'll jump out of the select but
// don't want to send anything back until we get some actual content to
// respond with, so we skip the return an go back to waiting for content to
// be sent down or the request timing out.
var hasTimedOut bool
for { for {
select { select {
// Wait for notifier to wake us up // Wait for notifier to wake us up
@ -91,10 +99,11 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
currPos = userStreamListener.GetStreamPosition() currPos = userStreamListener.GetStreamPosition()
// Or for timeout to expire // Or for timeout to expire
case <-timer.C: case <-timer.C:
return util.JSONResponse{ // We just need to ensure we get out of the select after reaching the
Code: http.StatusOK, // timeout, but there's nothing specific we want to do in this case
JSON: types.NewResponse(currPos), // apart from that, so we do nothing except stating we're timing out
} // and need to respond.
hasTimedOut = true
// Or for the request to be cancelled // Or for the request to be cancelled
case <-req.Context().Done(): case <-req.Context().Done():
return httputil.LogThenError(req, req.Context().Err()) return httputil.LogThenError(req, req.Context().Err())
@ -105,17 +114,17 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
// of calculating the sync only to get timed out before we // of calculating the sync only to get timed out before we
// can respond // can respond
syncData, err := rp.currentSyncForUser(*syncReq, currPos) syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
if !syncData.IsEmpty() {
if !syncData.IsEmpty() || hasTimedOut {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: syncData, JSON: syncData,
} }
} }
} }
} }