From 49c040c89f92894aa8839b27c87fb569f37e398e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Oct 2017 11:34:54 +0100 Subject: [PATCH] Refactor Notifier to return channel (#311) This has two benefits: 1. Using channels makes it easier to time out while waiting 2. Allows us to clean up goroutines that were waiting if we timeout the request --- .../dendrite/syncapi/sync/notifier.go | 60 ++++--- .../dendrite/syncapi/sync/notifier_test.go | 15 +- .../dendrite/syncapi/sync/requestpool.go | 26 +-- .../dendrite/syncapi/sync/userstream.go | 155 ++++++++++++++---- 4 files changed, 170 insertions(+), 86 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go index e0e5891d..4712a2c7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -17,11 +17,12 @@ package sync import ( "context" "sync" + "time" - log "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" ) // Notifier will wake up sleeping requests when there is some new data. @@ -38,6 +39,8 @@ type Notifier struct { currPos types.StreamPosition // A map of user_id => UserStream which can be used to wake a given user's /sync request. userStreams map[string]*UserStream + // The last time we cleaned out stale entries from the userStreams map + lastCleanUpTime time.Time } // NewNotifier creates a new notifier set to the given stream position. @@ -49,6 +52,7 @@ func NewNotifier(pos types.StreamPosition) *Notifier { roomIDToJoinedUsers: make(map[string]userIDSet), userStreams: make(map[string]*UserStream), streamLock: &sync.Mutex{}, + lastCleanUpTime: time.Now(), } } @@ -63,6 +67,8 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty defer n.streamLock.Unlock() n.currPos = pos + n.removeEmptyUserStreams() + if ev != nil { // Map this event's room_id to a list of joined users, and wake them up. userIDs := n.joinedUsers(ev.RoomID()) @@ -100,10 +106,10 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty } } -// WaitForEvents blocks until there are events for this request after sincePos. -// In particular, it will return immediately if there are already events after -// sincePos for the request, but otherwise blocks waiting for new events. -func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) types.StreamPosition { +// GetListener returns a UserStreamListener that can be used to wait for +// updates for a user. Must be closed. +// notify for anything before sincePos +func (n *Notifier) GetListener(req syncRequest) UserStreamListener { // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID // - Incoming events wake requests for a matching room ID @@ -112,25 +118,12 @@ func (n *Notifier) WaitForEvents(req syncRequest, sincePos types.StreamPosition) // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, // but given we don't do /events, let's pretend it doesn't exist. - // In a guard, check if the /sync request should block, and block it until we get woken up n.streamLock.Lock() - currentPos := n.currPos + defer n.streamLock.Unlock() - // TODO: We increment the stream position for any event, so it's possible that we return immediately - // with a pos which contains no new events for this user. We should probably re-wait for events - // automatically in this case. - if sincePos != currentPos { - n.streamLock.Unlock() - return currentPos - } + n.removeEmptyUserStreams() - // wait to be woken up, and then re-check the stream position - req.log.WithField("user_id", req.userID).Info("Waiting for event") - - // give up the stream lock prior to waiting on the user lock - stream := n.fetchUserStream(req.userID, true) - n.streamLock.Unlock() - return stream.Wait(currentPos) + return n.fetchUserStream(req.userID, true).GetListener(req.ctx) } // Load the membership states required to notify users correctly. @@ -178,7 +171,7 @@ func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStr stream, ok := n.userStreams[userID] if !ok && makeIfNotExists { // TODO: Unbounded growth of streams (1 per user) - stream = NewUserStream(userID) + stream = NewUserStream(userID, n.currPos) n.userStreams[userID] = stream } return stream @@ -208,6 +201,29 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { return n.roomIDToJoinedUsers[roomID].values() } +// removeEmptyUserStreams iterates through the user stream map and removes any +// that have been empty for a certain amount of time. This is a crude way of +// ensuring that the userStreams map doesn't grow forver. +// This should be called when the notifier gets called for whatever reason, +// the function itself is responsible for ensuring it doesn't iterate too +// often. +// NB: Callers should have locked the mutex before calling this function. +func (n *Notifier) removeEmptyUserStreams() { + // Only clean up now and again + now := time.Now() + if n.lastCleanUpTime.Add(time.Minute).After(now) { + return + } + n.lastCleanUpTime = now + + deleteBefore := now.Add(-5 * time.Minute) + for key, value := range n.userStreams { + if value.TimeOfLastNonEmpty().Before(deleteBefore) { + delete(n.userStreams, key) + } + } +} + // A string set, mainly existing for improving clarity of structs in this file. type userIDSet map[string]bool diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 7aab417b..6ee25968 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -256,24 +256,22 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { // same as Notifier.WaitForEvents but with a timeout. func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) { - done := make(chan types.StreamPosition, 1) - go func() { - newPos := n.WaitForEvents(req, req.since) - done <- newPos - close(done) - }() + listener := n.GetListener(req) + defer listener.Close() + select { case <-time.After(5 * time.Second): return types.StreamPosition(0), fmt.Errorf( "waitForEvents timed out waiting for %s (pos=%d)", req.userID, req.since, ) - case p := <-done: + case <-listener.GetNotifyChannel(req.since): + p := listener.GetStreamPosition() return p, nil } } // Wait until something is Wait()ing on the user stream. -func waitForBlocking(s *UserStream, numBlocking int) { +func waitForBlocking(s *UserStream, numBlocking uint) { for numBlocking != s.NumWaiting() { // This is horrible but I don't want to add a signalling mechanism JUST for testing. time.Sleep(1 * time.Microsecond) @@ -288,5 +286,6 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { wantFullState: false, limit: defaultTimelineLimit, log: util.GetLogger(context.TODO()), + ctx: context.TODO(), } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 81469087..ebfb140d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -19,7 +19,6 @@ import ( "net/http" "time" - log "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -28,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + log "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync @@ -82,10 +82,14 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above defer timer.Stop() + userStreamListener := rp.notifier.GetListener(*syncReq) + defer userStreamListener.Close() + for { select { // Wait for notifier to wake us up - case currPos = <-rp.makeNotifyChannel(*syncReq, currPos): + case <-userStreamListener.GetNotifyChannel(currPos): + currPos = userStreamListener.GetStreamPosition() // Or for timeout to expire case <-timer.C: return util.JSONResponse{ @@ -116,24 +120,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype } } -// makeNotifyChannel returns a channel that produces the current stream position -// when there *may* be something to return to the client. Only produces a single -// value and then closes the channel. -func (rp *RequestPool) makeNotifyChannel(syncReq syncRequest, sincePos types.StreamPosition) chan types.StreamPosition { - notified := make(chan types.StreamPosition) - - // TODO(#303): We need to ensure that WaitForEvents gets properly cancelled - // when the request is finished, or use some other mechanism to ensure we - // don't leak goroutines here - go (func() { - currentPos := rp.notifier.WaitForEvents(syncReq, sincePos) - notified <- currentPos - close(notified) - })() - - return notified -} - type stateEventInStateResp struct { gomatrixserverlib.ClientEvent PrevContent json.RawMessage `json:"prev_content,omitempty"` diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go index 349b3e27..77d09c20 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/userstream.go @@ -15,65 +15,148 @@ package sync import ( + "context" + "runtime" "sync" + "time" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/util" ) // UserStream represents a communication mechanism between the /sync request goroutine -// and the underlying sync server goroutines. Goroutines can Wait() for a stream position and -// goroutines can Broadcast(streamPosition) to other goroutines. +// and the underlying sync server goroutines. +// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() +// updates. type UserStream struct { UserID string - // Because this is a Cond, we can notify all waiting goroutines so this works - // across devices for the same user. Protects pos. - cond *sync.Cond - // The position to broadcast to callers of Wait(). + // The lock that protects changes to this struct + lock sync.Mutex + // Closed when there is an update. + signalChannel chan struct{} + // The last stream position that there may have been an update for the suser pos types.StreamPosition - // The number of goroutines blocked on Wait() - used for testing and metrics - numWaiting int + // The last time when we had some listeners waiting + timeOfLastChannel time.Time + // The number of listeners waiting + numWaiting uint +} + +// UserStreamListener allows a sync request to wait for updates for a user. +type UserStreamListener struct { + userStream *UserStream + + // Whether the stream has been closed + hasClosed bool } // NewUserStream creates a new user stream -func NewUserStream(userID string) *UserStream { +func NewUserStream(userID string, currPos types.StreamPosition) *UserStream { return &UserStream{ - UserID: userID, - cond: sync.NewCond(&sync.Mutex{}), + UserID: userID, + timeOfLastChannel: time.Now(), + pos: currPos, + signalChannel: make(chan struct{}), } } -// Wait blocks until there is a new stream position for this user, which is then returned. -// waitAtPos should be the position the stream thinks it should be waiting at. -func (s *UserStream) Wait(waitAtPos types.StreamPosition) (pos types.StreamPosition) { - s.cond.L.Lock() - // Before we start blocking, we need to make sure that we didn't race with a call - // to Broadcast() between calling Wait() and actually sleeping. We check the last - // broadcast pos to see if it is newer than the pos we are meant to wait at. If it - // is newer, something has Broadcast to this stream more recently so return immediately. - if s.pos > waitAtPos { - pos = s.pos - s.cond.L.Unlock() - return +// GetListener returns UserStreamListener that a sync request can use to wait +// for new updates with. +// UserStreamListener must be closed +func (s *UserStream) GetListener(ctx context.Context) UserStreamListener { + s.lock.Lock() + defer s.lock.Unlock() + + s.numWaiting++ // We decrement when UserStreamListener is closed + + listener := UserStreamListener{ + userStream: s, } - s.numWaiting++ - s.cond.Wait() - pos = s.pos - s.numWaiting-- - s.cond.L.Unlock() - return + + // Lets be a bit paranoid here and check that Close() is being called + runtime.SetFinalizer(&listener, func(l *UserStreamListener) { + if !l.hasClosed { + util.GetLogger(ctx).Warn("Didn't call Close on UserStreamListener") + l.Close() + } + }) + + return listener } // Broadcast a new stream position for this user. func (s *UserStream) Broadcast(pos types.StreamPosition) { - s.cond.L.Lock() + s.lock.Lock() + defer s.lock.Unlock() + s.pos = pos - s.cond.L.Unlock() - s.cond.Broadcast() + + close(s.signalChannel) + + s.signalChannel = make(chan struct{}) } -// NumWaiting returns the number of goroutines waiting for Wait() to return. Used for metrics and testing. -func (s *UserStream) NumWaiting() int { - s.cond.L.Lock() - defer s.cond.L.Unlock() +// NumWaiting returns the number of goroutines waiting for waiting for updates. +// Used for metrics and testing. +func (s *UserStream) NumWaiting() uint { + s.lock.Lock() + defer s.lock.Unlock() return s.numWaiting } + +// TimeOfLastNonEmpty returns the last time that the number of waiting listeners +// was non-empty, may be time.Now() if number of waiting listeners is currently +// non-empty. +func (s *UserStream) TimeOfLastNonEmpty() time.Time { + s.lock.Lock() + defer s.lock.Unlock() + + if s.numWaiting > 0 { + return time.Now() + } + + return s.timeOfLastChannel +} + +// GetStreamPosition returns last stream position which the UserStream was +// notified about +func (s *UserStreamListener) GetStreamPosition() types.StreamPosition { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + return s.userStream.pos +} + +// GetNotifyChannel returns a channel that is closed when there may be an +// update for the user. +// sincePos specifies from which point we want to be notified about. If there +// has already been an update after sincePos we'll return a closed channel +// immediately. +func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + if sincePos < s.userStream.pos { + // If the listener is behind, i.e. missed a potential update, then we + // want them to wake up immediately. We do this by returning a new + // closed stream, which returns immediately when selected. + closedChannel := make(chan struct{}) + close(closedChannel) + return closedChannel + } + + return s.userStream.signalChannel +} + +// Close cleans up resources used +func (s *UserStreamListener) Close() { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + if !s.hasClosed { + s.userStream.numWaiting-- + s.userStream.timeOfLastChannel = time.Now() + } + + s.hasClosed = true +}