Add sync mechanism to block when updating device lists (#1264)
* Add sync mechanism to block when updating device lists With a timeout, mainly for sytest to fix the test "Server correctly handles incoming m.device_list_update" which is flakey because it assumes that when `/send` 200 OKs that the server has updated the device lists in prep for `/keys/query` which is not always true when using workers. * Fix UT * Add new working test
This commit is contained in:
parent
0835107f5b
commit
d98ec12422
3 changed files with 46 additions and 16 deletions
|
@ -67,6 +67,11 @@ type DeviceListUpdater struct {
|
|||
producer KeyChangeProducer
|
||||
fedClient *gomatrixserverlib.FederationClient
|
||||
workerChans []chan gomatrixserverlib.ServerName
|
||||
|
||||
// When device lists are stale for a user, they get inserted into this map with a channel which `Update` will
|
||||
// block on or timeout via a select.
|
||||
userIDToChan map[string]chan bool
|
||||
userIDToChanMu *sync.Mutex
|
||||
}
|
||||
|
||||
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
|
||||
|
@ -98,12 +103,14 @@ func NewDeviceListUpdater(
|
|||
numWorkers int,
|
||||
) *DeviceListUpdater {
|
||||
return &DeviceListUpdater{
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
mu: &sync.Mutex{},
|
||||
db: db,
|
||||
producer: producer,
|
||||
fedClient: fedClient,
|
||||
workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
mu: &sync.Mutex{},
|
||||
db: db,
|
||||
producer: producer,
|
||||
fedClient: fedClient,
|
||||
workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
|
||||
userIDToChan: make(map[string]chan bool),
|
||||
userIDToChanMu: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,6 +144,8 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
|
|||
return u.userIDToMutex[userID]
|
||||
}
|
||||
|
||||
// Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest,
|
||||
// which assumes when /send 200 OKs that the device lists have been updated.
|
||||
func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error {
|
||||
isDeviceListStale, err := u.update(ctx, event)
|
||||
if err != nil {
|
||||
|
@ -213,7 +222,35 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
|
|||
hash := fnv.New32a()
|
||||
_, _ = hash.Write([]byte(remoteServer))
|
||||
index := int(hash.Sum32()) % len(u.workerChans)
|
||||
|
||||
ch := u.assignChannel(userID)
|
||||
u.workerChans[index] <- remoteServer
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(10 * time.Second):
|
||||
// we don't return an error in this case as it's not a failure condition.
|
||||
// we mainly block for the benefit of sytest anyway
|
||||
}
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) assignChannel(userID string) chan bool {
|
||||
u.userIDToChanMu.Lock()
|
||||
defer u.userIDToChanMu.Unlock()
|
||||
if ch, ok := u.userIDToChan[userID]; ok {
|
||||
return ch
|
||||
}
|
||||
ch := make(chan bool)
|
||||
u.userIDToChan[userID] = ch
|
||||
return ch
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) clearChannel(userID string) {
|
||||
u.userIDToChanMu.Lock()
|
||||
defer u.userIDToChanMu.Unlock()
|
||||
if ch, ok := u.userIDToChan[userID]; ok {
|
||||
close(ch)
|
||||
delete(u.userIDToChan, userID)
|
||||
}
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
|
||||
|
@ -285,6 +322,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
|||
if err != nil {
|
||||
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
|
||||
hasFailures = true
|
||||
} else {
|
||||
u.clearChannel(userID)
|
||||
}
|
||||
}
|
||||
return hasFailures
|
||||
|
|
|
@ -204,16 +204,6 @@ func TestUpdateNoPrevID(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Update returned an error: %s", err)
|
||||
}
|
||||
// At this point we show have this device list marked as stale and not store the keys or emitted anything
|
||||
if !db.staleUsers[event.UserID] {
|
||||
t.Errorf("%s not marked as stale", event.UserID)
|
||||
}
|
||||
if len(producer.events) > 0 {
|
||||
t.Errorf("Update incorrect emitted %d device change events", len(producer.events))
|
||||
}
|
||||
if len(db.storedKeys) > 0 {
|
||||
t.Errorf("Update incorrect stored %d device change events", len(db.storedKeys))
|
||||
}
|
||||
t.Log("waiting for /users/devices to be called...")
|
||||
wg.Wait()
|
||||
// wait a bit for db to be updated...
|
||||
|
|
|
@ -143,6 +143,7 @@ Device deletion propagates over federation
|
|||
If remote user leaves room, changes device and rejoins we see update in sync
|
||||
If remote user leaves room, changes device and rejoins we see update in /keys/changes
|
||||
If remote user leaves room we no longer receive device updates
|
||||
If a device list update goes missing, the server resyncs on the next one
|
||||
Get left notifs in sync and /keys/changes when other user leaves
|
||||
Can query remote device keys using POST after notification
|
||||
Can add account data
|
||||
|
|
Loading…
Reference in a new issue