diff --git a/go.mod b/go.mod index e367b87a..d2116fcd 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d + github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 9aa03909..81f6f0a4 100644 --- a/go.sum +++ b/go.sum @@ -424,6 +424,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d h1:zYk/bQ5bmHDsRqHBl57aBxo5bizsknWU3sunZf9WnWI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200804124807-5012a626de1d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d h1:BsSge1plJoqMyL5xnUFuTkzMXC42GbJW1kMbMizTmL4= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200806145220-3120f1087f6d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f/go.mod h1:y0oDTjZDv5SM9a2rp3bl+CU+bvTRINQsdb7YlDql5Go= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go new file mode 100644 index 00000000..19d8463d --- /dev/null +++ b/keyserver/internal/device_list_update.go @@ -0,0 +1,298 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "encoding/json" + "fmt" + "hash/fnv" + "sync" + "time" + + "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/keyserver/producers" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/sirupsen/logrus" +) + +// DeviceListUpdater handles device list updates from remote servers. +// +// In the case where we have the prev_id for an update, the updater just stores the update (after acquiring a per-user lock). +// In the case where we do not have the prev_id for an update, the updater marks the user_id as stale and notifies +// a worker to get the latest device list for this user. Note: stream IDs are scoped per user so missing a prev_id +// for a (user, device) does not mean that DEVICE is outdated as the previous ID could be for a different device: +// we have to invalidate all devices for that user. Once the list has been fetched, the per-user lock is acquired and the +// updater stores the latest list along with the latest stream ID. +// +// On startup, the updater spins up N workers which are responsible for querying device keys from remote servers. +// Workers are scoped by homeserver domain, with one worker responsible for many domains, determined by hashing +// mod N the server name. Work is sent via a channel which just serves to "poke" the worker as the data is retrieved +// from the database (which allows us to batch requests to the same server). This has a number of desirable properties: +// - We guarantee only 1 in-flight /keys/query request per server at any time as there is exactly 1 worker responsible +// for that domain. +// - We don't have unbounded growth in proportion to the number of servers (this is more important in a P2P world where +// we have many many servers) +// - We can adjust concurrency (at the cost of memory usage) by tuning N, to accommodate mobile devices vs servers. +// The downsides are that: +// - Query requests can get queued behind other servers if they hash to the same worker, even if there are other free +// workers elsewhere. Whilst suboptimal, provided we cap how long a single request can last (e.g using context timeouts) +// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests +// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse +// than being stuck behind foo.bar +// In the event that the query fails, the worker spins up a short-lived goroutine whose sole purpose is to inject the server +// name back into the channel after a certain amount of time. If in the interim the device lists have been updated, then +// the database query will return no stale lists. Reinjection into the channel continues until success or the server terminates, +// when it will be reloaded on startup. +type DeviceListUpdater struct { + // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 + // request to the remote server and race. + // TODO: Put in an LRU cache to bound growth + userIDToMutex map[string]*sync.Mutex + mu *sync.Mutex // protects UserIDToMutex + + db DeviceListUpdaterDatabase + producer *producers.KeyChange + fedClient *gomatrixserverlib.FederationClient + workerChans []chan gomatrixserverlib.ServerName +} + +// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater. +// Useful for testing. +type DeviceListUpdaterDatabase interface { + // StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. + // If no domains are given, all user IDs with stale device lists are returned. + StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) + + // MarkDeviceListStale sets the stale bit for this user to isStale. + MarkDeviceListStale(ctx context.Context, userID string, isStale bool) error + + // StoreRemoteDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key + // for this (user, device). Does not modify the stream ID for keys. + StoreRemoteDeviceKeys(ctx context.Context, keys []api.DeviceMessage) error + + // PrevIDsExists returns true if all prev IDs exist for this user. + PrevIDsExists(ctx context.Context, userID string, prevIDs []int) (bool, error) +} + +// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. +func NewDeviceListUpdater( + db DeviceListUpdaterDatabase, producer *producers.KeyChange, fedClient *gomatrixserverlib.FederationClient, + numWorkers int, +) *DeviceListUpdater { + return &DeviceListUpdater{ + userIDToMutex: make(map[string]*sync.Mutex), + mu: &sync.Mutex{}, + db: db, + producer: producer, + fedClient: fedClient, + workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + } +} + +// Start the device list updater, which will try to refresh any stale device lists. +func (u *DeviceListUpdater) Start() error { + for i := 0; i < len(u.workerChans); i++ { + // Allocate a small buffer per channel. + // If the buffer limit is reached, backpressure will cause the processing of EDUs + // to stop (in this transaction) until key requests can be made. + ch := make(chan gomatrixserverlib.ServerName, 10) + u.workerChans[i] = ch + go u.worker(ch) + } + + staleLists, err := u.db.StaleDeviceLists(context.Background(), []gomatrixserverlib.ServerName{}) + if err != nil { + return err + } + for _, userID := range staleLists { + u.notifyWorkers(userID) + } + return nil +} + +func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex { + u.mu.Lock() + defer u.mu.Unlock() + if u.userIDToMutex[userID] == nil { + u.userIDToMutex[userID] = &sync.Mutex{} + } + return u.userIDToMutex[userID] +} + +func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error { + isDeviceListStale, err := u.update(ctx, event) + if err != nil { + return err + } + if isDeviceListStale { + // poke workers to handle stale device lists + u.notifyWorkers(event.UserID) + } + return nil +} + +func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) (bool, error) { + mu := u.mutex(event.UserID) + mu.Lock() + defer mu.Unlock() + // check if we have the prev IDs + exists, err := u.db.PrevIDsExists(ctx, event.UserID, event.PrevID) + if err != nil { + return false, fmt.Errorf("failed to check prev IDs exist for %s (%s): %w", event.UserID, event.DeviceID, err) + } + util.GetLogger(ctx).WithFields(logrus.Fields{ + "prev_ids_exist": exists, + "user_id": event.UserID, + "device_id": event.DeviceID, + "stream_id": event.StreamID, + "prev_ids": event.PrevID, + }).Info("DeviceListUpdater.Update") + + // if we haven't missed anything update the database and notify users + if exists { + keys := []api.DeviceMessage{ + { + DeviceKeys: api.DeviceKeys{ + DeviceID: event.DeviceID, + DisplayName: event.DeviceDisplayName, + KeyJSON: event.Keys, + UserID: event.UserID, + }, + StreamID: event.StreamID, + }, + } + err = u.db.StoreRemoteDeviceKeys(ctx, keys) + if err != nil { + return false, fmt.Errorf("failed to store remote device keys for %s (%s): %w", event.UserID, event.DeviceID, err) + } + // ALWAYS emit key changes when we've been poked over federation even if there's no change + // just in case this poke is important for something. + err = u.producer.ProduceKeyChanges(keys) + if err != nil { + return false, fmt.Errorf("failed to produce device key changes for %s (%s): %w", event.UserID, event.DeviceID, err) + } + return false, nil + } + + err = u.db.MarkDeviceListStale(ctx, event.UserID, true) + if err != nil { + return false, fmt.Errorf("failed to mark device list for %s as stale: %w", event.UserID, err) + } + + return true, nil +} + +func (u *DeviceListUpdater) notifyWorkers(userID string) { + _, remoteServer, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return + } + hash := fnv.New32a() + _, _ = hash.Write([]byte(remoteServer)) + index := int(hash.Sum32()) % len(u.workerChans) + u.workerChans[index] <- remoteServer +} + +func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { + // It's possible to get many of the same server name in the channel, so in order + // to prevent processing the same server over and over we keep track of when we + // last made a request to the server. If we get the server name during the cooloff + // period, we'll ignore the poke. + lastProcessed := make(map[gomatrixserverlib.ServerName]time.Time) + cooloffPeriod := time.Minute + shouldProcess := func(srv gomatrixserverlib.ServerName) bool { + // we should process requests when now is after the last process time + cooloff + return time.Now().After(lastProcessed[srv].Add(cooloffPeriod)) + } + + // on failure, spin up a short-lived goroutine to inject the server name again. + inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) { + time.Sleep(duration) + ch <- srv + } + + for serverName := range ch { + if !shouldProcess(serverName) { + // do not inject into the channel as we know there will be a sleeping goroutine + // which will do it after the cooloff period expires + continue + } + lastProcessed[serverName] = time.Now() + shouldRetry := u.processServer(serverName) + if shouldRetry { + go inject(serverName, cooloffPeriod) // TODO: Backoff? + } + } +} + +func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) bool { + requestTimeout := time.Minute // max amount of time we want to spend on each request + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + logger := util.GetLogger(ctx).WithField("server_name", serverName) + // fetch stale device lists + userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName}) + if err != nil { + logger.WithError(err).Error("failed to load stale device lists") + return true + } + hasFailures := false + for _, userID := range userIDs { + if ctx.Err() != nil { + // we've timed out, give up and go to the back of the queue to let another server be processed. + hasFailures = true + break + } + res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) + if err != nil { + logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") + hasFailures = true + continue + } + err = u.updateDeviceList(ctx, &res) + if err != nil { + logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store it") + hasFailures = true + } + } + return hasFailures +} + +func (u *DeviceListUpdater) updateDeviceList(ctx context.Context, res *gomatrixserverlib.RespUserDevices) error { + keys := make([]api.DeviceMessage, len(res.Devices)) + for i, device := range res.Devices { + keyJSON, err := json.Marshal(device.Keys) + if err != nil { + util.GetLogger(ctx).WithField("keys", device.Keys).Error("failed to marshal keys, skipping device") + continue + } + keys[i] = api.DeviceMessage{ + StreamID: res.StreamID, + DeviceKeys: api.DeviceKeys{ + DeviceID: device.DeviceID, + DisplayName: device.DisplayName, + UserID: res.UserID, + KeyJSON: keyJSON, + }, + } + } + err := u.db.StoreRemoteDeviceKeys(ctx, keys) + if err != nil { + return err + } + return u.db.MarkDeviceListStale(ctx, res.UserID, false) +} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index d6e24566..ff298c07 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -38,74 +38,22 @@ type KeyInternalAPI struct { FedClient *gomatrixserverlib.FederationClient UserAPI userapi.UserInternalAPI Producer *producers.KeyChange - // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 - // request to the remote server and race. - // TODO: Put in an LRU cache to bound growth - UserIDToMutex map[string]*sync.Mutex - Mutex *sync.Mutex // protects UserIDToMutex + Updater *DeviceListUpdater } func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) { a.UserAPI = i } -func (a *KeyInternalAPI) mutex(userID string) *sync.Mutex { - a.Mutex.Lock() - defer a.Mutex.Unlock() - if a.UserIDToMutex[userID] == nil { - a.UserIDToMutex[userID] = &sync.Mutex{} - } - return a.UserIDToMutex[userID] -} - func (a *KeyInternalAPI) InputDeviceListUpdate( ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse, ) { - mu := a.mutex(req.Event.UserID) - mu.Lock() - defer mu.Unlock() - // check if we have the prev IDs - exists, err := a.DB.PrevIDsExists(ctx, req.Event.UserID, req.Event.PrevID) + err := a.Updater.Update(ctx, req.Event) if err != nil { res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to check if prev ids exist: %s", err), + Err: fmt.Sprintf("failed to update device list: %s", err), } - return } - - // if we haven't missed anything update the database and notify users - if exists { - keys := []api.DeviceMessage{ - { - DeviceKeys: api.DeviceKeys{ - DeviceID: req.Event.DeviceID, - DisplayName: req.Event.DeviceDisplayName, - KeyJSON: req.Event.Keys, - UserID: req.Event.UserID, - }, - StreamID: req.Event.StreamID, - }, - } - err = a.DB.StoreRemoteDeviceKeys(ctx, keys) - if err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to store remote device keys: %s", err), - } - return - } - // ALWAYS emit key changes when we've been poked over federation just in case - // this poke is important for something. - err = a.Producer.ProduceKeyChanges(keys) - if err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("failed to emit remote device key changes: %s", err), - } - } - return - } - - // if we're missing an ID go and fetch it from the remote HS - } func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 79d9cec9..4a6fbe3c 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -15,8 +15,6 @@ package keyserver import ( - "sync" - "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/internal/config" @@ -52,12 +50,16 @@ func NewInternalAPI( Producer: producer, DB: db, } + updater := internal.NewDeviceListUpdater(db, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable + err = updater.Start() + if err != nil { + logrus.WithError(err).Panicf("failed to start device list updater") + } return &internal.KeyInternalAPI{ - DB: db, - ThisServer: cfg.Matrix.ServerName, - FedClient: fedClient, - Producer: keyChangeProducer, - Mutex: &sync.Mutex{}, - UserIDToMutex: make(map[string]*sync.Mutex), + DB: db, + ThisServer: cfg.Matrix.ServerName, + FedClient: fedClient, + Producer: keyChangeProducer, + Updater: updater, } } diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index f67bbf71..2a60aacc 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -19,6 +19,7 @@ import ( "encoding/json" "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/gomatrixserverlib" ) type Database interface { @@ -64,4 +65,11 @@ type Database interface { // A to offset of sarama.OffsetNewest means no upper limit. // Returns the offset of the latest key change. KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) + + // StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. + // If no domains are given, all user IDs with stale device lists are returned. + StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) + + // MarkDeviceListStale sets the stale bit for this user to isStale. + MarkDeviceListStale(ctx context.Context, userID string, isStale bool) error } diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go index 78729774..68964be6 100644 --- a/keyserver/storage/shared/storage.go +++ b/keyserver/storage/shared/storage.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/storage/tables" + "github.com/matrix-org/gomatrixserverlib" ) type Database struct { @@ -124,3 +125,14 @@ func (d *Database) StoreKeyChange(ctx context.Context, partition int32, offset i func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) { return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset, toOffset) } + +// StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists. +// If no domains are given, all user IDs with stale device lists are returned. +func (d *Database) StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) { + return nil, nil // TODO +} + +// MarkDeviceListStale sets the stale bit for this user to isStale. +func (d *Database) MarkDeviceListStale(ctx context.Context, userID string, isStale bool) error { + return nil // TODO +}