Defer keyserver and federationsender wakeups to give HTTP listeners time to start (#1389)
parent
33b8143a95
commit
04bc09f591
|
@ -20,6 +20,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
|
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/federationsender/statistics"
|
"github.com/matrix-org/dendrite/federationsender/statistics"
|
||||||
|
@ -65,26 +66,28 @@ func NewOutgoingQueues(
|
||||||
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
|
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
|
||||||
}
|
}
|
||||||
// Look up which servers we have pending items for and then rehydrate those queues.
|
// Look up which servers we have pending items for and then rehydrate those queues.
|
||||||
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
|
time.AfterFunc(time.Second*5, func() {
|
||||||
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
|
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
|
||||||
for _, serverName := range names {
|
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
|
||||||
serverNames[serverName] = struct{}{}
|
for _, serverName := range names {
|
||||||
|
serverNames[serverName] = struct{}{}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
|
||||||
}
|
}
|
||||||
} else {
|
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
|
||||||
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
|
for _, serverName := range names {
|
||||||
}
|
serverNames[serverName] = struct{}{}
|
||||||
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
|
}
|
||||||
for _, serverName := range names {
|
} else {
|
||||||
serverNames[serverName] = struct{}{}
|
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
||||||
}
|
}
|
||||||
} else {
|
for serverName := range serverNames {
|
||||||
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
if !queues.getQueue(serverName).statistics.Blacklisted() {
|
||||||
}
|
queues.getQueue(serverName).wakeQueueIfNeeded()
|
||||||
for serverName := range serverNames {
|
}
|
||||||
if !queues.getQueue(serverName).statistics.Blacklisted() {
|
|
||||||
queues.getQueue(serverName).wakeQueueIfNeeded()
|
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
return queues
|
return queues
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,10 +48,11 @@ func NewInternalAPI(
|
||||||
DB: db,
|
DB: db,
|
||||||
}
|
}
|
||||||
updater := internal.NewDeviceListUpdater(db, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
updater := internal.NewDeviceListUpdater(db, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||||
err = updater.Start()
|
go func() {
|
||||||
if err != nil {
|
if err := updater.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start device list updater")
|
logrus.WithError(err).Panicf("failed to start device list updater")
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
return &internal.KeyInternalAPI{
|
return &internal.KeyInternalAPI{
|
||||||
DB: db,
|
DB: db,
|
||||||
ThisServer: cfg.Matrix.ServerName,
|
ThisServer: cfg.Matrix.ServerName,
|
||||||
|
|
Loading…
Reference in New Issue