Refactor Notifier to return channel (#311)

This has two benefits:

1. Using channels makes it easier to time out while waiting
2. Allows us to clean up goroutines that were waiting if we timeout the
   request
main
Erik Johnston 2017-10-26 11:34:54 +01:00 committed by GitHub
parent e5944e0fdb
commit 49c040c89f
4 changed files with 170 additions and 86 deletions

View File

@ -17,11 +17,12 @@ package sync
import ( import (
"context" "context"
"sync" "sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
) )
// Notifier will wake up sleeping requests when there is some new data. // Notifier will wake up sleeping requests when there is some new data.
@ -38,6 +39,8 @@ type Notifier struct {
currPos types.StreamPosition currPos types.StreamPosition
// A map of user_id => UserStream which can be used to wake a given user's /sync request. // A map of user_id => UserStream which can be used to wake a given user's /sync request.
userStreams map[string]*UserStream userStreams map[string]*UserStream
// The last time we cleaned out stale entries from the userStreams map
lastCleanUpTime time.Time
} }
// NewNotifier creates a new notifier set to the given stream position. // NewNotifier creates a new notifier set to the given stream position.
@ -49,6 +52,7 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
roomIDToJoinedUsers: make(map[string]userIDSet), roomIDToJoinedUsers: make(map[string]userIDSet),
userStreams: make(map[string]*UserStream), userStreams: make(map[string]*UserStream),
streamLock: &sync.Mutex{}, streamLock: &sync.Mutex{},
lastCleanUpTime: time.Now(),
} }
} }
@ -63,6 +67,8 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
defer n.streamLock.Unlock() defer n.streamLock.Unlock()
n.currPos = pos n.currPos = pos
n.removeEmptyUserStreams()
if ev != nil { if ev != nil {
// Map this event's room_id to a list of joined users, and wake them up. // Map this event's room_id to a list of joined users, and wake them up.
userIDs := n.joinedUsers(ev.RoomID()) userIDs := n.joinedUsers(ev.RoomID())
@ -100,10 +106,10 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
} }
} }
// WaitForEvents blocks until there are events for this request after sincePos. // GetListener returns a UserStreamListener that can be used to wait for
// In particular, it will return immediately if there are already events after // updates for a user. Must be closed.
// sincePos for the request, but otherwise blocks waiting for new events. // notify for anything before sincePos
func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) types.StreamPosition { func (n *Notifier) GetListener(req syncRequest) UserStreamListener {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID // - Incoming events wake requests for a matching room ID
@ -112,25 +118,12 @@ func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition)
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
// but given we don't do /events, let's pretend it doesn't exist. // but given we don't do /events, let's pretend it doesn't exist.
// In a guard, check if the /sync request should block, and block it until we get woken up
n.streamLock.Lock() n.streamLock.Lock()
currentPos := n.currPos defer n.streamLock.Unlock()
// TODO: We increment the stream position for any event, so it's possible that we return immediately n.removeEmptyUserStreams()
// with a pos which contains no new events for this user. We should probably re-wait for events
// automatically in this case.
if sincePos != currentPos {
n.streamLock.Unlock()
return currentPos
}
// wait to be woken up, and then re-check the stream position return n.fetchUserStream(req.userID, true).GetListener(req.ctx)
req.log.WithField("user_id", req.userID).Info("Waiting for event")
// give up the stream lock prior to waiting on the user lock
stream := n.fetchUserStream(req.userID, true)
n.streamLock.Unlock()
return stream.Wait(currentPos)
} }
// Load the membership states required to notify users correctly. // Load the membership states required to notify users correctly.
@ -178,7 +171,7 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr
stream, ok := n.userStreams[userID] stream, ok := n.userStreams[userID]
if !ok && makeIfNotExists { if !ok && makeIfNotExists {
// TODO: Unbounded growth of streams (1 per user) // TODO: Unbounded growth of streams (1 per user)
stream = NewUserStream(userID) stream = NewUserStream(userID, n.currPos)
n.userStreams[userID] = stream n.userStreams[userID] = stream
} }
return stream return stream
@ -208,6 +201,29 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
return n.roomIDToJoinedUsers[roomID].values() return n.roomIDToJoinedUsers[roomID].values()
} }
// removeEmptyUserStreams iterates through the user stream map and removes any
// that have been empty for a certain amount of time. This is a crude way of
// ensuring that the userStreams map doesn't grow forver.
// This should be called when the notifier gets called for whatever reason,
// the function itself is responsible for ensuring it doesn't iterate too
// often.
// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) removeEmptyUserStreams() {
// Only clean up now and again
now := time.Now()
if n.lastCleanUpTime.Add(time.Minute).After(now) {
return
}
n.lastCleanUpTime = now
deleteBefore := now.Add(-5 * time.Minute)
for key, value := range n.userStreams {
if value.TimeOfLastNonEmpty().Before(deleteBefore) {
delete(n.userStreams, key)
}
}
}
// A string set, mainly existing for improving clarity of structs in this file. // A string set, mainly existing for improving clarity of structs in this file.
type userIDSet map[string]bool type userIDSet map[string]bool

View File

@ -256,24 +256,22 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// same as Notifier.WaitForEvents but with a timeout. // same as Notifier.WaitForEvents but with a timeout.
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
done := make(chan types.StreamPosition, 1) listener := n.GetListener(req)
go func() { defer listener.Close()
newPos := n.WaitForEvents(req, req.since)
done <- newPos
close(done)
}()
select { select {
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
return types.StreamPosition(0), fmt.Errorf( return types.StreamPosition(0), fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since,
) )
case p := <-done: case <-listener.GetNotifyChannel(req.since):
p := listener.GetStreamPosition()
return p, nil return p, nil
} }
} }
// Wait until something is Wait()ing on the user stream. // Wait until something is Wait()ing on the user stream.
func waitForBlocking(s *UserStream, numBlocking int) { func waitForBlocking(s *UserStream, numBlocking uint) {
for numBlocking != s.NumWaiting() { for numBlocking != s.NumWaiting() {
// This is horrible but I don't want to add a signalling mechanism JUST for testing. // This is horrible but I don't want to add a signalling mechanism JUST for testing.
time.Sleep(1 * time.Microsecond) time.Sleep(1 * time.Microsecond)
@ -288,5 +286,6 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
wantFullState: false, wantFullState: false,
limit: defaultTimelineLimit, limit: defaultTimelineLimit,
log: util.GetLogger(context.TODO()), log: util.GetLogger(context.TODO()),
ctx: context.TODO(),
} }
} }

View File

@ -19,7 +19,6 @@ import (
"net/http" "net/http"
"time" "time"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
@ -28,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
) )
// RequestPool manages HTTP long-poll connections for /sync // RequestPool manages HTTP long-poll connections for /sync
@ -82,10 +82,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
defer timer.Stop() defer timer.Stop()
userStreamListener := rp.notifier.GetListener(*syncReq)
defer userStreamListener.Close()
for { for {
select { select {
// Wait for notifier to wake us up // Wait for notifier to wake us up
case currPos = <-rp.makeNotifyChannel(*syncReq, currPos): case <-userStreamListener.GetNotifyChannel(currPos):
currPos = userStreamListener.GetStreamPosition()
// Or for timeout to expire // Or for timeout to expire
case <-timer.C: case <-timer.C:
return util.JSONResponse{ return util.JSONResponse{
@ -116,24 +120,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
} }
} }
// makeNotifyChannel returns a channel that produces the current stream position
// when there *may* be something to return to the client. Only produces a single
// value and then closes the channel.
func (rp *RequestPool) makeNotifyChannel(syncReq syncRequest, sincePos types.StreamPosition) chan types.StreamPosition {
notified := make(chan types.StreamPosition)
// TODO(#303): We need to ensure that WaitForEvents gets properly cancelled
// when the request is finished, or use some other mechanism to ensure we
// don't leak goroutines here
go (func() {
currentPos := rp.notifier.WaitForEvents(syncReq, sincePos)
notified <- currentPos
close(notified)
})()
return notified
}
type stateEventInStateResp struct { type stateEventInStateResp struct {
gomatrixserverlib.ClientEvent gomatrixserverlib.ClientEvent
PrevContent json.RawMessage `json:"prev_content,omitempty"` PrevContent json.RawMessage `json:"prev_content,omitempty"`

View File

@ -15,65 +15,148 @@
package sync package sync
import ( import (
"context"
"runtime"
"sync" "sync"
"time"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/util"
) )
// UserStream represents a communication mechanism between the /sync request goroutine // UserStream represents a communication mechanism between the /sync request goroutine
// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and // and the underlying sync server goroutines.
// goroutines can Broadcast(streamPosition) to other goroutines. // Goroutines can get a UserStreamListener to wait for updates, and can Broadcast()
// updates.
type UserStream struct { type UserStream struct {
UserID string UserID string
// Because this is a Cond, we can notify all waiting goroutines so this works // The lock that protects changes to this struct
// across devices for the same user. Protects pos. lock sync.Mutex
cond *sync.Cond // Closed when there is an update.
// The position to broadcast to callers of Wait(). signalChannel chan struct{}
// The last stream position that there may have been an update for the suser
pos types.StreamPosition pos types.StreamPosition
// The number of goroutines blocked on Wait() - used for testing and metrics // The last time when we had some listeners waiting
numWaiting int timeOfLastChannel time.Time
// The number of listeners waiting
numWaiting uint
}
// UserStreamListener allows a sync request to wait for updates for a user.
type UserStreamListener struct {
userStream *UserStream
// Whether the stream has been closed
hasClosed bool
} }
// NewUserStream creates a new user stream // NewUserStream creates a new user stream
func NewUserStream(userID string) *UserStream { func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
return &UserStream{ return &UserStream{
UserID: userID, UserID: userID,
cond: sync.NewCond(&sync.Mutex{}), timeOfLastChannel: time.Now(),
pos: currPos,
signalChannel: make(chan struct{}),
} }
} }
// Wait blocks until there is a new stream position for this user, which is then returned. // GetListener returns UserStreamListener that a sync request can use to wait
// waitAtPos should be the position the stream thinks it should be waiting at. // for new updates with.
func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) { // UserStreamListener must be closed
s.cond.L.Lock() func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
// Before we start blocking, we need to make sure that we didn't race with a call s.lock.Lock()
// to Broadcast() between calling Wait() and actually sleeping. We check the last defer s.lock.Unlock()
// broadcast pos to see if it is newer than the pos we are meant to wait at. If it
// is newer, something has Broadcast to this stream more recently so return immediately. s.numWaiting++ // We decrement when UserStreamListener is closed
if s.pos > waitAtPos {
pos = s.pos listener := UserStreamListener{
s.cond.L.Unlock() userStream: s,
return
} }
s.numWaiting++
s.cond.Wait() // Lets be a bit paranoid here and check that Close() is being called
pos = s.pos runtime.SetFinalizer(&listener, func(l *UserStreamListener) {
s.numWaiting-- if !l.hasClosed {
s.cond.L.Unlock() util.GetLogger(ctx).Warn("Didn't call Close on UserStreamListener")
return l.Close()
}
})
return listener
} }
// Broadcast a new stream position for this user. // Broadcast a new stream position for this user.
func (s *UserStream) Broadcast(pos types.StreamPosition) { func (s *UserStream) Broadcast(pos types.StreamPosition) {
s.cond.L.Lock() s.lock.Lock()
defer s.lock.Unlock()
s.pos = pos s.pos = pos
s.cond.L.Unlock()
s.cond.Broadcast() close(s.signalChannel)
s.signalChannel = make(chan struct{})
} }
// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing. // NumWaiting returns the number of goroutines waiting for waiting for updates.
func (s *UserStream) NumWaiting() int { // Used for metrics and testing.
s.cond.L.Lock() func (s *UserStream) NumWaiting() uint {
defer s.cond.L.Unlock() s.lock.Lock()
defer s.lock.Unlock()
return s.numWaiting return s.numWaiting
} }
// TimeOfLastNonEmpty returns the last time that the number of waiting listeners
// was non-empty, may be time.Now() if number of waiting listeners is currently
// non-empty.
func (s *UserStream) TimeOfLastNonEmpty() time.Time {
s.lock.Lock()
defer s.lock.Unlock()
if s.numWaiting > 0 {
return time.Now()
}
return s.timeOfLastChannel
}
// GetStreamPosition returns last stream position which the UserStream was
// notified about
func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
return s.userStream.pos
}
// GetNotifyChannel returns a channel that is closed when there may be an
// update for the user.
// sincePos specifies from which point we want to be notified about. If there
// has already been an update after sincePos we'll return a closed channel
// immediately.
func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
if sincePos < s.userStream.pos {
// If the listener is behind, i.e. missed a potential update, then we
// want them to wake up immediately. We do this by returning a new
// closed stream, which returns immediately when selected.
closedChannel := make(chan struct{})
close(closedChannel)
return closedChannel
}
return s.userStream.signalChannel
}
// Close cleans up resources used
func (s *UserStreamListener) Close() {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
if !s.hasClosed {
s.userStream.numWaiting--
s.userStream.timeOfLastChannel = time.Now()
}
s.hasClosed = true
}