Fix some edge cases with /sync (#302)
Including: - Handle timeout=0 correctly - Always return immediately on initial sync - Handle spurious wake ups from the notifiermain
parent
35b628f5bf
commit
bd07447abe
|
@ -100,8 +100,10 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForEvents blocks until there are new events for this request.
|
// WaitForEvents blocks until there are events for this request after sincePos.
|
||||||
func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
// In particular, it will return immediately if there are already events after
|
||||||
|
// sincePos for the request, but otherwise blocks waiting for new events.
|
||||||
|
func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) types.StreamPosition {
|
||||||
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
|
||||||
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
|
||||||
// - Incoming events wake requests for a matching room ID
|
// - Incoming events wake requests for a matching room ID
|
||||||
|
@ -117,7 +119,7 @@ func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition {
|
||||||
// TODO: We increment the stream position for any event, so it's possible that we return immediately
|
// TODO: We increment the stream position for any event, so it's possible that we return immediately
|
||||||
// with a pos which contains no new events for this user. We should probably re-wait for events
|
// with a pos which contains no new events for this user. We should probably re-wait for events
|
||||||
// automatically in this case.
|
// automatically in this case.
|
||||||
if req.since != currentPos {
|
if sincePos != currentPos {
|
||||||
n.streamLock.Unlock()
|
n.streamLock.Unlock()
|
||||||
return currentPos
|
return currentPos
|
||||||
}
|
}
|
||||||
|
@ -141,6 +143,11 @@ func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CurrentPosition returns the current stream position
|
||||||
|
func (n *Notifier) CurrentPosition() types.StreamPosition {
|
||||||
|
return n.currPos
|
||||||
|
}
|
||||||
|
|
||||||
// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
|
// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
|
||||||
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
|
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
|
||||||
// OnNewEvent (eg on startup) to prevent racing.
|
// OnNewEvent (eg on startup) to prevent racing.
|
||||||
|
|
|
@ -258,7 +258,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
|
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
|
||||||
done := make(chan types.StreamPosition, 1)
|
done := make(chan types.StreamPosition, 1)
|
||||||
go func() {
|
go func() {
|
||||||
newPos := n.WaitForEvents(req)
|
newPos := n.WaitForEvents(req, req.since)
|
||||||
done <- newPos
|
done <- newPos
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -62,48 +62,76 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
"timeout": syncReq.timeout,
|
"timeout": syncReq.timeout,
|
||||||
}).Info("Incoming /sync request")
|
}).Info("Incoming /sync request")
|
||||||
|
|
||||||
// Fork off 2 goroutines: one to do the work, and one to serve as a timeout.
|
currPos := rp.notifier.CurrentPosition()
|
||||||
// Whichever returns first is the one we will serve back to the client.
|
|
||||||
timeoutChan := make(chan struct{})
|
|
||||||
timer := time.AfterFunc(syncReq.timeout, func() {
|
|
||||||
close(timeoutChan) // signal that the timeout has expired
|
|
||||||
})
|
|
||||||
|
|
||||||
done := make(chan util.JSONResponse)
|
// If this is an initial sync or timeout=0 we return immediately
|
||||||
go func() {
|
if syncReq.since == types.StreamPosition(0) || syncReq.timeout == 0 {
|
||||||
currentPos := rp.notifier.WaitForEvents(*syncReq)
|
syncData, err := rp.currentSyncForUser(*syncReq, currPos)
|
||||||
// We stop the timer BEFORE calculating the response so the cpu work
|
|
||||||
// done to calculate the response is not timed. This stops us from
|
|
||||||
// doing lots of work then timing out and sending back an empty response.
|
|
||||||
timer.Stop()
|
|
||||||
syncData, err := rp.currentSyncForUser(*syncReq, currentPos)
|
|
||||||
var res util.JSONResponse
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res = httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
} else {
|
|
||||||
syncData, err = rp.appendAccountData(syncData, device.UserID, *syncReq, currentPos)
|
|
||||||
if err != nil {
|
|
||||||
res = httputil.LogThenError(req, err)
|
|
||||||
} else {
|
|
||||||
res = util.JSONResponse{
|
|
||||||
Code: 200,
|
|
||||||
JSON: syncData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
done <- res
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-timeoutChan: // timeout fired
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: types.NewResponse(syncReq.since),
|
JSON: syncData,
|
||||||
}
|
}
|
||||||
case res := <-done: // received a response
|
|
||||||
return res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Otherwise, we wait for the notifier to tell us if something *may* have
|
||||||
|
// happened. We loop in case it turns out that nothing did happen.
|
||||||
|
|
||||||
|
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// Wait for notifier to wake us up
|
||||||
|
case currPos = <-rp.makeNotifyChannel(*syncReq, currPos):
|
||||||
|
// Or for timeout to expire
|
||||||
|
case <-timer.C:
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 200,
|
||||||
|
JSON: types.NewResponse(syncReq.since),
|
||||||
|
}
|
||||||
|
// Or for the request to be cancelled
|
||||||
|
case <-req.Context().Done():
|
||||||
|
return httputil.LogThenError(req, req.Context().Err())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that we don't time out during calculation of sync
|
||||||
|
// response. This ensures that we don't waste the hard work
|
||||||
|
// of calculating the sync only to get timed out before we
|
||||||
|
// can respond
|
||||||
|
|
||||||
|
syncData, err := rp.currentSyncForUser(*syncReq, currPos)
|
||||||
|
if err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
if !syncData.IsEmpty() {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 200,
|
||||||
|
JSON: syncData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeNotifyChannel returns a channel that produces the current stream position
|
||||||
|
// when there *may* be something to return to the client. Only produces a single
|
||||||
|
// value and then closes the channel.
|
||||||
|
func (rp *RequestPool) makeNotifyChannel(syncReq syncRequest, sincePos types.StreamPosition) chan types.StreamPosition {
|
||||||
|
notified := make(chan types.StreamPosition)
|
||||||
|
|
||||||
|
// TODO(#303): We need to ensure that WaitForEvents gets properly cancelled
|
||||||
|
// when the request is finished, or use some other mechanism to ensure we
|
||||||
|
// don't leak goroutines here
|
||||||
|
go (func() {
|
||||||
|
currentPos := rp.notifier.WaitForEvents(syncReq, sincePos)
|
||||||
|
notified <- currentPos
|
||||||
|
close(notified)
|
||||||
|
})()
|
||||||
|
|
||||||
|
return notified
|
||||||
}
|
}
|
||||||
|
|
||||||
type stateEventInStateResp struct {
|
type stateEventInStateResp struct {
|
||||||
|
@ -196,12 +224,20 @@ func (rp *RequestPool) OnIncomingStateTypeRequest(req *http.Request, roomID stri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
if req.since == types.StreamPosition(0) {
|
if req.since == types.StreamPosition(0) {
|
||||||
return rp.db.CompleteSync(req.ctx, req.userID, req.limit)
|
res, err = rp.db.CompleteSync(req.ctx, req.userID, req.limit)
|
||||||
|
} else {
|
||||||
|
res, err = rp.db.IncrementalSync(req.ctx, req.userID, req.since, currentPos, req.limit)
|
||||||
}
|
}
|
||||||
return rp.db.IncrementalSync(req.ctx, req.userID, req.since, currentPos, req.limit)
|
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err = rp.appendAccountData(res, req.userID, req, currentPos)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) appendAccountData(
|
func (rp *RequestPool) appendAccountData(
|
||||||
|
|
|
@ -74,6 +74,16 @@ func NewResponse(pos StreamPosition) *Response {
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsEmpty returns true if the response is empty, i.e. used to decided whether
|
||||||
|
// to return the response immediately to the client or to wait for more data.
|
||||||
|
func (r *Response) IsEmpty() bool {
|
||||||
|
return len(r.Rooms.Join) == 0 &&
|
||||||
|
len(r.Rooms.Invite) == 0 &&
|
||||||
|
len(r.Rooms.Leave) == 0 &&
|
||||||
|
len(r.AccountData.Events) == 0 &&
|
||||||
|
len(r.Presence.Events) == 0
|
||||||
|
}
|
||||||
|
|
||||||
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
||||||
type JoinResponse struct {
|
type JoinResponse struct {
|
||||||
State struct {
|
State struct {
|
||||||
|
|
Loading…
Reference in New Issue