Optimise servers to backfill from (#1485)
- Prefer perspective servers if they are in the room. - Limit the number of backfill servers to 5 to avoid taking too long.
This commit is contained in:
parent
4feff8e8d9
commit
0f7e707f39
3 changed files with 58 additions and 22 deletions
|
@ -26,28 +26,30 @@ type RoomserverInternalAPI struct {
|
|||
*perform.Leaver
|
||||
*perform.Publisher
|
||||
*perform.Backfiller
|
||||
DB storage.Database
|
||||
Cfg *config.RoomServer
|
||||
Producer sarama.SyncProducer
|
||||
Cache caching.RoomServerCaches
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
KeyRing gomatrixserverlib.JSONVerifier
|
||||
fsAPI fsAPI.FederationSenderInternalAPI
|
||||
OutputRoomEventTopic string // Kafka topic for new output room events
|
||||
DB storage.Database
|
||||
Cfg *config.RoomServer
|
||||
Producer sarama.SyncProducer
|
||||
Cache caching.RoomServerCaches
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
KeyRing gomatrixserverlib.JSONVerifier
|
||||
fsAPI fsAPI.FederationSenderInternalAPI
|
||||
OutputRoomEventTopic string // Kafka topic for new output room events
|
||||
PerspectiveServerNames []gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
func NewRoomserverAPI(
|
||||
cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer,
|
||||
outputRoomEventTopic string, caches caching.RoomServerCaches,
|
||||
keyRing gomatrixserverlib.JSONVerifier,
|
||||
keyRing gomatrixserverlib.JSONVerifier, perspectiveServerNames []gomatrixserverlib.ServerName,
|
||||
) *RoomserverInternalAPI {
|
||||
serverACLs := acls.NewServerACLs(roomserverDB)
|
||||
a := &RoomserverInternalAPI{
|
||||
DB: roomserverDB,
|
||||
Cfg: cfg,
|
||||
Cache: caches,
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
KeyRing: keyRing,
|
||||
DB: roomserverDB,
|
||||
Cfg: cfg,
|
||||
Cache: caches,
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
PerspectiveServerNames: perspectiveServerNames,
|
||||
KeyRing: keyRing,
|
||||
Queryer: &query.Queryer{
|
||||
DB: roomserverDB,
|
||||
Cache: caches,
|
||||
|
@ -105,6 +107,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
|
|||
DB: r.DB,
|
||||
FSAPI: r.fsAPI,
|
||||
KeyRing: r.KeyRing,
|
||||
// Perspective servers are trusted to not lie about server keys, so we will also
|
||||
// prefer these servers when backfilling (assuming they are in the room) rather
|
||||
// than trying random servers
|
||||
PreferServers: r.PerspectiveServerNames,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,11 +30,19 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// the max number of servers to backfill from per request. If this is too low we may fail to backfill when
|
||||
// we could've from another server. If this is too high we may take far too long to successfully backfill
|
||||
// as we try dead servers.
|
||||
const maxBackfillServers = 5
|
||||
|
||||
type Backfiller struct {
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
DB storage.Database
|
||||
FSAPI federationSenderAPI.FederationSenderInternalAPI
|
||||
KeyRing gomatrixserverlib.JSONVerifier
|
||||
|
||||
// The servers which should be preferred above other servers when backfilling
|
||||
PreferServers []gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
// PerformBackfill implements api.RoomServerQueryAPI
|
||||
|
@ -96,7 +104,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
|
|||
if info == nil || info.IsStub {
|
||||
return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID)
|
||||
}
|
||||
requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities)
|
||||
requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities, r.PreferServers)
|
||||
// Request 100 items regardless of what the query asks for.
|
||||
// We don't want to go much higher than this.
|
||||
// We can't honour exactly the limit as some sytests rely on requesting more for tests to pass
|
||||
|
@ -215,10 +223,11 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
|
|||
|
||||
// backfillRequester implements gomatrixserverlib.BackfillRequester
|
||||
type backfillRequester struct {
|
||||
db storage.Database
|
||||
fsAPI federationSenderAPI.FederationSenderInternalAPI
|
||||
thisServer gomatrixserverlib.ServerName
|
||||
bwExtrems map[string][]string
|
||||
db storage.Database
|
||||
fsAPI federationSenderAPI.FederationSenderInternalAPI
|
||||
thisServer gomatrixserverlib.ServerName
|
||||
preferServer map[gomatrixserverlib.ServerName]bool
|
||||
bwExtrems map[string][]string
|
||||
|
||||
// per-request state
|
||||
servers []gomatrixserverlib.ServerName
|
||||
|
@ -226,7 +235,14 @@ type backfillRequester struct {
|
|||
eventIDMap map[string]gomatrixserverlib.Event
|
||||
}
|
||||
|
||||
func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester {
|
||||
func newBackfillRequester(
|
||||
db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName,
|
||||
bwExtrems map[string][]string, preferServers []gomatrixserverlib.ServerName,
|
||||
) *backfillRequester {
|
||||
preferServer := make(map[gomatrixserverlib.ServerName]bool)
|
||||
for _, p := range preferServers {
|
||||
preferServer[p] = true
|
||||
}
|
||||
return &backfillRequester{
|
||||
db: db,
|
||||
fsAPI: fsAPI,
|
||||
|
@ -234,6 +250,7 @@ func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.Federat
|
|||
eventIDToBeforeStateIDs: make(map[string][]string),
|
||||
eventIDMap: make(map[string]gomatrixserverlib.Event),
|
||||
bwExtrems: bwExtrems,
|
||||
preferServer: preferServer,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -436,8 +453,16 @@ FindSuccessor:
|
|||
if server == b.thisServer {
|
||||
continue
|
||||
}
|
||||
servers = append(servers, server)
|
||||
if b.preferServer[server] { // insert at the front
|
||||
servers = append([]gomatrixserverlib.ServerName{server}, servers...)
|
||||
} else { // insert at the back
|
||||
servers = append(servers, server)
|
||||
}
|
||||
}
|
||||
if len(servers) > maxBackfillServers {
|
||||
servers = servers[:maxBackfillServers]
|
||||
}
|
||||
|
||||
b.servers = servers
|
||||
return servers
|
||||
}
|
||||
|
|
|
@ -41,6 +41,11 @@ func NewInternalAPI(
|
|||
) api.RoomserverInternalAPI {
|
||||
cfg := &base.Cfg.RoomServer
|
||||
|
||||
var perspectiveServerNames []gomatrixserverlib.ServerName
|
||||
for _, kp := range base.Cfg.ServerKeyAPI.KeyPerspectives {
|
||||
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
|
||||
}
|
||||
|
||||
roomserverDB, err := storage.Open(&cfg.Database, base.Caches)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to room server db")
|
||||
|
@ -48,6 +53,6 @@ func NewInternalAPI(
|
|||
|
||||
return internal.NewRoomserverAPI(
|
||||
cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
|
||||
base.Caches, keyRing,
|
||||
base.Caches, keyRing, perspectiveServerNames,
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue