Modify DeviceListUpdater to retry requests according to RetryAfter (#1342)

* Modify DeviceListUpdater to retry requests according to RetryAfter

* Reduce wait time for sytest test pollution
main
Kegsay 2020-08-26 12:03:09 +01:00 committed by GitHub
parent 3205b9212d
commit abd16ff4a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 6 deletions

View File

@ -310,24 +310,25 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
}
}
lastProcessed[serverName] = time.Now()
shouldRetry := u.processServer(serverName)
waitTime, shouldRetry := u.processServer(serverName)
if shouldRetry {
scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
go inject(serverName, cooloffPeriod)
scheduledRetries[serverName] = time.Now().Add(waitTime)
go inject(serverName, waitTime)
}
}
}
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) bool {
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
requestTimeout := time.Minute // max amount of time we want to spend on each request
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
logger := util.GetLogger(ctx).WithField("server_name", serverName)
waitTime := 2 * time.Second
// fetch stale device lists
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
if err != nil {
logger.WithError(err).Error("failed to load stale device lists")
return true
return waitTime, true
}
hasFailures := false
for _, userID := range userIDs {
@ -339,6 +340,10 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user")
fcerr, ok := err.(*fedsenderapi.FederationClientError)
if ok && fcerr.RetryAfter > 0 {
waitTime = fcerr.RetryAfter
}
hasFailures = true
continue
}
@ -352,7 +357,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
// always clear the channel to unblock Update calls regardless of success/failure
u.clearChannel(userID)
}
return hasFailures
return waitTime, hasFailures
}
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {