From 04f3c154b872b1ac5f09e86ef19358f47983c415 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 10 May 2017 10:42:00 +0100 Subject: [PATCH] Split out notifying /sync requests and calculating sync responses (#96) * Split out notifying /sync requests and calculating sync responses The logic for notifying /sync requests is about to get really complicated as we optimise when to wake up requests, so split out that code into a separate struct to isolate it and make it easier to unit test. --- .../cmd/dendrite-sync-api-server/main.go | 10 +-- .../dendrite/syncapi/consumers/roomserver.go | 8 +-- .../dendrite/syncapi/sync/notifier.go | 67 +++++++++++++++++++ .../dendrite/syncapi/sync/requestpool.go | 44 ++---------- 4 files changed, 82 insertions(+), 47 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 750f0203..f67bf0e5 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/dendrite/syncapi/types" log "github.com/Sirupsen/logrus" yaml "gopkg.in/yaml.v2" @@ -71,12 +72,13 @@ func main() { log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - rp, err := sync.NewRequestPool(db) + pos, err := db.SyncStreamPosition() if err != nil { - log.Panicf("startup: Failed to create request pool : %s", err) + log.Panicf("startup: failed to get latest sync stream position : %s", err) } - server, err := consumers.NewServer(cfg, rp, db) + n := sync.NewNotifier(types.StreamPosition(pos)) + server, err := consumers.NewServer(cfg, n, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) } @@ -85,6 +87,6 @@ func main() { } log.Info("Starting sync server on ", *bindAddr) - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, rp) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n)) log.Fatal(http.ListenAndServe(*bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index b9737163..4d703ab3 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -32,11 +32,11 @@ import ( type Server struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase - rp *sync.RequestPool + notifier *sync.Notifier } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServerDatabase) (*Server, error) { +func NewServer(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -50,7 +50,7 @@ func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServer s := &Server{ roomServerConsumer: &consumer, db: store, - rp: rp, + notifier: n, } consumer.ProcessMessage = s.onMessage @@ -96,7 +96,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) + s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) return nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go new file mode 100644 index 00000000..cc986579 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier.go @@ -0,0 +1,67 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sync + +import ( + "sync" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// Notifier will wake up sleeping requests in the request pool when there +// is some new data. It does not tell requests what that data is, only the +// stream position which they can use to get at it. +type Notifier struct { + // The latest sync stream position: guarded by 'cond'. + currPos types.StreamPosition + // A condition variable to notify all waiting goroutines of a new sync stream position + cond *sync.Cond +} + +// NewNotifier creates a new notifier set to the given stream position. +func NewNotifier(pos types.StreamPosition) *Notifier { + return &Notifier{ + pos, + sync.NewCond(&sync.Mutex{}), + } +} + +// OnNewEvent is called when a new event is received from the room server. Must only be +// called from a single goroutine, to avoid races between updates which could set the +// current position in the stream incorrectly. +func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { + // update the current position in a guard and then notify all /sync streams + n.cond.L.Lock() + n.currPos = pos + n.cond.L.Unlock() + + n.cond.Broadcast() // notify ALL waiting goroutines +} + +// WaitForEvents blocks until there are new events for this request. +func (n *Notifier) WaitForEvents(req syncRequest) types.StreamPosition { + // In a guard, check if the /sync request should block, and block it until we get a new position + n.cond.L.Lock() + currentPos := n.currPos + for req.since == currentPos { + // we need to wait for a new event. + // TODO: This waits for ANY new event, we need to only wait for events which we care about. + n.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock + currentPos = n.currPos + } + n.cond.L.Unlock() + return currentPos +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 124922b2..eee117e7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -16,7 +16,6 @@ package sync import ( "net/http" - "sync" "time" log "github.com/Sirupsen/logrus" @@ -31,20 +30,13 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db *storage.SyncServerDatabase - // The latest sync stream position: guarded by 'cond'. - currPos types.StreamPosition - // A condition variable to notify all waiting goroutines of a new sync stream position - cond *sync.Cond + db *storage.SyncServerDatabase + notifier *Notifier } // NewRequestPool makes a new RequestPool -func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { - pos, err := db.SyncStreamPosition() - if err != nil { - return nil, err - } - return &RequestPool{db, types.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil +func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier) *RequestPool { + return &RequestPool{db, n} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -106,34 +98,8 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } } -// OnNewEvent is called when a new event is received from the room server. Must only be -// called from a single goroutine, to avoid races between updates which could set the -// current position in the stream incorrectly. -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { - // update the current position in a guard and then notify all /sync streams - rp.cond.L.Lock() - rp.currPos = pos - rp.cond.L.Unlock() - - rp.cond.Broadcast() // notify ALL waiting goroutines -} - -func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition { - // In a guard, check if the /sync request should block, and block it until we get a new position - rp.cond.L.Lock() - currentPos := rp.currPos - for req.since == currentPos { - // we need to wait for a new event. - // TODO: This waits for ANY new event, we need to only wait for events which we care about. - rp.cond.Wait() // atomically unlocks and blocks goroutine, then re-acquires lock on unblock - currentPos = rp.currPos - } - rp.cond.L.Unlock() - return currentPos -} - func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { - currentPos := rp.waitForEvents(req) + currentPos := rp.notifier.WaitForEvents(req) if req.since == types.StreamPosition(0) { pos, data, err := rp.db.CompleteSync(req.userID, req.limit)