diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go index 0dd8da20..a574ceff 100644 --- a/federationsender/statistics/statistics.go +++ b/federationsender/statistics/statistics.go @@ -66,10 +66,16 @@ type ServerStatistics struct { serverName gomatrixserverlib.ServerName // blacklisted atomic.Bool // is the node blacklisted backoffStarted atomic.Bool // is the backoff started + backoffUntil atomic.Value // time.Time until this backoff interval ends backoffCount atomic.Uint32 // number of times BackoffDuration has been called successCounter atomic.Uint32 // how many times have we succeeded? } +// duration returns how long the next backoff interval should be. +func (s *ServerStatistics) duration(count uint32) time.Duration { + return time.Second * time.Duration(math.Exp2(float64(count))) +} + // Success updates the server statistics with a new successful // attempt, which increases the sent counter and resets the idle and // failure counters. If a host was blacklisted at this point then @@ -88,11 +94,36 @@ func (s *ServerStatistics) Success() { // Failure marks a failure and starts backing off if needed. // The next call to BackoffIfRequired will do the right thing -// after this. -func (s *ServerStatistics) Failure() { +// after this. It will return the time that the current failure +// will result in backoff waiting until, and a bool signalling +// whether we have blacklisted and therefore to give up. +func (s *ServerStatistics) Failure() (time.Time, bool) { + // If we aren't already backing off, this call will start + // a new backoff period. Reset the counter to 0 so that + // we backoff only for short periods of time to start with. if s.backoffStarted.CAS(false, true) { s.backoffCount.Store(0) } + + // Check if we have blacklisted this node. + if s.blacklisted.Load() { + return time.Now(), true + } + + // If we're already backing off and we haven't yet surpassed + // the deadline then return that. Repeated calls to Failure + // within a single backoff interval will have no side effects. + if until, ok := s.backoffUntil.Load().(time.Time); ok && !time.Now().After(until) { + return until, false + } + + // We're either backing off and have passed the deadline, or + // we aren't backing off, so work out what the next interval + // will be. + count := s.backoffCount.Load() + until := time.Now().Add(s.duration(count)) + s.backoffUntil.Store(until) + return until, false } // BackoffIfRequired will block for as long as the current @@ -102,11 +133,8 @@ func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt < return 0, false } - // Work out how many times we've backed off so far. - count := s.backoffCount.Inc() - duration := time.Second * time.Duration(math.Exp2(float64(count))) - // Work out if we should be blacklisting at this point. + count := s.backoffCount.Inc() if count >= s.statistics.FailuresUntilBlacklist { // We've exceeded the maximum amount of times we're willing // to back off, which is probably in the region of hours by @@ -118,9 +146,14 @@ func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt < logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName) } } - return duration, true + return 0, true } + // Work out when we should wait until. + duration := s.duration(count) + until := time.Now().Add(duration) + s.backoffUntil.Store(until) + // Notify the destination queue that we're backing off now. backingOff.Store(true) defer backingOff.Store(false) diff --git a/federationsender/statistics/statistics_test.go b/federationsender/statistics/statistics_test.go index 9050662e..7e083de6 100644 --- a/federationsender/statistics/statistics_test.go +++ b/federationsender/statistics/statistics_test.go @@ -10,7 +10,7 @@ import ( func TestBackoff(t *testing.T) { stats := Statistics{ - FailuresUntilBlacklist: 5, + FailuresUntilBlacklist: 7, } server := ServerStatistics{ statistics: &stats, @@ -41,10 +41,20 @@ func TestBackoff(t *testing.T) { // Get the duration. duration, blacklist := server.BackoffIfRequired(backingOff, interrupt) + // Register another failure for good measure. This should have no + // side effects since a backoff is already in progress. If it does + // then we'll fail. + until, blacklisted := server.Failure() + if time.Until(until) > duration { + t.Fatal("Failure produced unexpected side effect when it shouldn't have") + } + // Check if we should be blacklisted by now. - if i > stats.FailuresUntilBlacklist { + if i >= stats.FailuresUntilBlacklist { if !blacklist { t.Fatalf("Backoff %d should have resulted in blacklist but didn't", i) + } else if blacklist != blacklisted { + t.Fatalf("BackoffIfRequired and Failure returned different blacklist values") } else { t.Logf("Backoff %d is blacklisted as expected", i) continue