Only emit key changes which are different from what we had before (#1279)
We did this already for local `/keys/upload` but didn't for remote `/users/devices`. This meant any resyncs would spam produce events, hammering disk i/o and spamming the logs.main
parent
a5a85c6a11
commit
02a8515e99
|
@ -91,6 +91,9 @@ type DeviceListUpdaterDatabase interface {
|
||||||
|
|
||||||
// PrevIDsExists returns true if all prev IDs exist for this user.
|
// PrevIDsExists returns true if all prev IDs exist for this user.
|
||||||
PrevIDsExists(ctx context.Context, userID string, prevIDs []int) (bool, error)
|
PrevIDsExists(ctx context.Context, userID string, prevIDs []int) (bool, error)
|
||||||
|
|
||||||
|
// DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` or `StreamID` already then it will be replaced.
|
||||||
|
DeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// KeyChangeProducer is the interface for producers.KeyChange useful for testing.
|
// KeyChangeProducer is the interface for producers.KeyChange useful for testing.
|
||||||
|
@ -354,6 +357,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {
|
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {
|
||||||
ctx := context.Background() // we've got the keys, don't time out when persisting them to the database.
|
ctx := context.Background() // we've got the keys, don't time out when persisting them to the database.
|
||||||
keys := make([]api.DeviceMessage, len(res.Devices))
|
keys := make([]api.DeviceMessage, len(res.Devices))
|
||||||
|
existingKeys := make([]api.DeviceMessage, len(res.Devices))
|
||||||
for i, device := range res.Devices {
|
for i, device := range res.Devices {
|
||||||
keyJSON, err := json.Marshal(device.Keys)
|
keyJSON, err := json.Marshal(device.Keys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -369,7 +373,21 @@ func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevi
|
||||||
KeyJSON: keyJSON,
|
KeyJSON: keyJSON,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
existingKeys[i] = api.DeviceMessage{
|
||||||
|
DeviceKeys: api.DeviceKeys{
|
||||||
|
UserID: res.UserID,
|
||||||
|
DeviceID: device.DeviceID,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
// fetch what keys we had already and only emit changes
|
||||||
|
if err := u.db.DeviceKeysJSON(ctx, existingKeys); err != nil {
|
||||||
|
// non-fatal, log and continue
|
||||||
|
util.GetLogger(ctx).WithError(err).WithField("user_id", res.UserID).Errorf(
|
||||||
|
"failed to query device keys json for calculating diffs",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
err := u.db.StoreRemoteDeviceKeys(ctx, keys, []string{res.UserID})
|
err := u.db.StoreRemoteDeviceKeys(ctx, keys, []string{res.UserID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to store remote device keys: %w", err)
|
return fmt.Errorf("failed to store remote device keys: %w", err)
|
||||||
|
@ -378,7 +396,7 @@ func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevi
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to mark device list as fresh: %w", err)
|
return fmt.Errorf("failed to mark device list as fresh: %w", err)
|
||||||
}
|
}
|
||||||
err = u.producer.ProduceKeyChanges(keys)
|
err = emitDeviceKeyChanges(u.producer, existingKeys, keys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to emit key changes for fresh device list: %w", err)
|
return fmt.Errorf("failed to emit key changes for fresh device list: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,6 +91,10 @@ func (d *mockDeviceListUpdaterDatabase) PrevIDsExists(ctx context.Context, userI
|
||||||
return d.prevIDsExist(userID, prevIDs), nil
|
return d.prevIDsExist(userID, prevIDs), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *mockDeviceListUpdaterDatabase) DeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type roundTripper struct {
|
type roundTripper struct {
|
||||||
fn func(*http.Request) (*http.Response, error)
|
fn func(*http.Request) (*http.Response, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -505,7 +505,7 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = a.emitDeviceKeyChanges(existingKeys, keysToStore)
|
err = emitDeviceKeyChanges(a.Producer, existingKeys, keysToStore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err)
|
util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -550,7 +550,7 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceMessage) error {
|
func emitDeviceKeyChanges(producer KeyChangeProducer, existing, new []api.DeviceMessage) error {
|
||||||
// find keys in new that are not in existing
|
// find keys in new that are not in existing
|
||||||
var keysAdded []api.DeviceMessage
|
var keysAdded []api.DeviceMessage
|
||||||
for _, newKey := range new {
|
for _, newKey := range new {
|
||||||
|
@ -567,7 +567,7 @@ func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceMessage)
|
||||||
keysAdded = append(keysAdded, newKey)
|
keysAdded = append(keysAdded, newKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return a.Producer.ProduceKeyChanges(keysAdded)
|
return producer.ProduceKeyChanges(keysAdded)
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendDisplayNames(existing, new []api.DeviceMessage) []api.DeviceMessage {
|
func appendDisplayNames(existing, new []api.DeviceMessage) []api.DeviceMessage {
|
||||||
|
|
Loading…
Reference in New Issue