From 49abe359e6a2b0c3f214190b73404c5cf9a0e051 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 15 Oct 2020 13:27:13 +0100 Subject: [PATCH] Start Kafka connections for each component that needs them (#1527) * Start Kafka connection for each component that needs one * Fix roomserver unit tests * Rename to naffkaInstance (@Kegsay review comment) * Fix import cycle --- appservice/appservice.go | 5 ++- build/docker/docker-compose.deps.yml | 2 + build/gobind/monolith.go | 14 +++---- clientapi/clientapi.go | 5 ++- cmd/dendrite-client-api-server/main.go | 2 +- cmd/dendrite-demo-libp2p/main.go | 14 +++---- cmd/dendrite-demo-yggdrasil/main.go | 14 +++---- cmd/dendrite-key-server/main.go | 2 +- cmd/dendrite-monolith-server/main.go | 14 +++---- cmd/dendrite-sync-api-server/main.go | 3 +- cmd/dendritejs/main.go | 14 +++---- eduserver/eduserver.go | 6 ++- federationsender/federationsender.go | 9 +++-- internal/setup/base.go | 46 +--------------------- internal/setup/kafka/kafka.go | 53 ++++++++++++++++++++++++++ internal/setup/monolith.go | 17 ++++----- keyserver/keyserver.go | 6 ++- roomserver/roomserver.go | 5 ++- roomserver/roomserver_test.go | 22 ++++++++--- syncapi/syncapi.go | 5 ++- 20 files changed, 143 insertions(+), 115 deletions(-) create mode 100644 internal/setup/kafka/kafka.go diff --git a/appservice/appservice.go b/appservice/appservice.go index e356f68e..cf9a47b7 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -47,6 +48,8 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { + consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) + // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) if err != nil { @@ -86,7 +89,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, appserviceDB, + base.Cfg, consumer, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { diff --git a/build/docker/docker-compose.deps.yml b/build/docker/docker-compose.deps.yml index afc572d0..74e478a8 100644 --- a/build/docker/docker-compose.deps.yml +++ b/build/docker/docker-compose.deps.yml @@ -29,6 +29,8 @@ services: KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + ports: + - 9092:9092 depends_on: - zookeeper networks: diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 7d10b87e..fd010809 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -112,7 +112,7 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -146,13 +146,11 @@ func (m *DendriteMonolith) Start() { rsAPI.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 2ab92ed4..ebe55aec 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -15,7 +15,6 @@ package clientapi import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/api" @@ -24,6 +23,7 @@ import ( eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -36,7 +36,6 @@ import ( func AddPublicRoutes( router *mux.Router, cfg *config.ClientAPI, - producer sarama.SyncProducer, accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, @@ -48,6 +47,8 @@ func AddPublicRoutes( keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, ) { + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 0fdc6679..0061de74 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -37,7 +37,7 @@ func main() { keyAPI := base.KeyServerHTTPClient() clientapi.AddPublicRoutes( - base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation, + base.PublicClientAPIMux, &base.Cfg.ClientAPI, accountDB, federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil, ) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index b5386325..61fdd801 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -139,7 +139,7 @@ func main() { accountDB := base.Base.CreateAccountsDB() federation := createFederationClient(base) - keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation, base.Base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -169,13 +169,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Base.Cfg, - AccountDB: accountDB, - Client: createClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.Base.KafkaConsumer, - KafkaProducer: base.Base.KafkaProducer, + Config: base.Base.Cfg, + AccountDB: accountDB, + Client: createClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 5e8b9231..a4097363 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -96,7 +96,7 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -129,13 +129,11 @@ func main() { rsComponent.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: ygg.CreateClient(base), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go index 92d18ac3..ff5b2223 100644 --- a/cmd/dendrite-key-server/main.go +++ b/cmd/dendrite-key-server/main.go @@ -24,7 +24,7 @@ func main() { base := setup.NewBaseDendrite(cfg, "KeyServer", true) defer base.Close() // nolint: errcheck - intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient(), base.KafkaProducer) + intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, base.CreateFederationClient()) intAPI.SetUserAPI(base.UserAPIClient()) keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 0fe70ca8..e935805f 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -108,7 +108,7 @@ func main() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsImpl.SetFederationSenderAPI(fsAPI) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -127,13 +127,11 @@ func main() { } monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: base.CreateClient(), - FedClient: federation, - KeyRing: keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: base.CreateClient(), + FedClient: federation, + KeyRing: keyRing, AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index b879f842..351dbc5f 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -21,6 +21,7 @@ import ( func main() { cfg := setup.ParseFlags(false) + base := setup.NewBaseDendrite(cfg, "SyncAPI", true) defer base.Close() // nolint: errcheck @@ -30,7 +31,7 @@ func main() { rsAPI := base.RoomserverHTTPClient() syncapi.AddPublicRoutes( - base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI, + base.PublicClientAPIMux, userAPI, rsAPI, base.KeyServerHTTPClient(), federation, &cfg.SyncAPI, ) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 2d7f8b02..85cc8a9f 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -190,7 +190,7 @@ func main() { accountDB := base.CreateAccountsDB() federation := createFederationClient(cfg, node) - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) @@ -212,13 +212,11 @@ func main() { p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) monolith := setup.Monolith{ - Config: base.Cfg, - AccountDB: accountDB, - Client: createClient(node), - FedClient: federation, - KeyRing: &keyRing, - KafkaConsumer: base.KafkaConsumer, - KafkaProducer: base.KafkaProducer, + Config: base.Cfg, + AccountDB: accountDB, + Client: createClient(node), + FedClient: federation, + KeyRing: &keyRing, AppserviceAPI: asQuery, EDUInternalAPI: eduInputAPI, diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index b6196c26..098ac024 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/inthttp" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -41,10 +42,13 @@ func NewInternalAPI( userAPI userapi.UserInternalAPI, ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer + + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + return &input.EDUServerInputAPI{ Cache: eduCache, UserAPI: userAPI, - Producer: base.KafkaProducer, + Producer: producer, OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), ServerName: cfg.Matrix.ServerName, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 2f122328..78791140 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -55,6 +56,8 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + queues := queue.NewOutgoingQueues( federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats, @@ -66,7 +69,7 @@ func NewInternalAPI( ) rsConsumer := consumers.NewOutputRoomEventConsumer( - cfg, base.KafkaConsumer, queues, + cfg, consumer, queues, federationSenderDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { @@ -74,13 +77,13 @@ func NewInternalAPI( } tsConsumer := consumers.NewOutputEDUConsumer( - cfg, base.KafkaConsumer, queues, federationSenderDB, + cfg, consumer, queues, federationSenderDB, ) if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, rsAPI, + &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") diff --git a/internal/setup/base.go b/internal/setup/base.go index 24a0d6aa..8bc4ae17 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -26,13 +26,9 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/storage/accounts" - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" @@ -73,8 +69,8 @@ type BaseDendrite struct { httpClient *http.Client Cfg *config.Dendrite Caches *caching.Caches - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + // KafkaConsumer sarama.Consumer + // KafkaProducer sarama.SyncProducer } const HTTPServerTimeout = time.Minute * 5 @@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo logrus.WithError(err).Panicf("failed to start opentracing") } - var kafkaConsumer sarama.Consumer - var kafkaProducer sarama.SyncProducer - if cfg.Global.Kafka.UseNaffka { - kafkaConsumer, kafkaProducer = setupNaffka(cfg) - } else { - kafkaConsumer, kafkaProducer = setupKafka(cfg) - } - cache, err := caching.NewInMemoryLRUCache(true) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") @@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(), apiHttpClient: &apiClient, httpClient: &client, - KafkaConsumer: kafkaConsumer, - KafkaProducer: kafkaProducer, } } @@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP( select {} } - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naff, err := naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naff, naff -} diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go new file mode 100644 index 00000000..9855ae15 --- /dev/null +++ b/internal/setup/kafka/kafka.go @@ -0,0 +1,53 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/sirupsen/logrus" +) + +func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if cfg.UseNaffka { + return setupNaffka(cfg) + } + return setupKafka(cfg) +} + +// setupKafka creates kafka consumer/producer pair from the config. +func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to start kafka consumer") + } + + producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + if err != nil { + logrus.WithError(err).Panic("failed to setup kafka producers") + } + + return consumer, producer +} + +// In monolith mode with Naffka, we don't have the same constraints about +// consuming the same topic from more than one place like we do with Kafka. +// Therefore, we will only open one Naffka connection in case Naffka is +// running on SQLite. +var naffkaInstance *naffka.Naffka + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { + if naffkaInstance != nil { + return naffkaInstance, naffkaInstance + } + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + naffkaInstance, err = naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + return naffkaInstance, naffkaInstance +} diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index a0675d61..9d3625d2 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -15,7 +15,6 @@ package setup import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi" @@ -38,13 +37,11 @@ import ( // Monolith represents an instantiation of all dependencies required to build // all components of Dendrite, for use in monolith mode. type Monolith struct { - Config *config.Dendrite - AccountDB accounts.Database - KeyRing *gomatrixserverlib.KeyRing - Client *gomatrixserverlib.Client - FedClient *gomatrixserverlib.FederationClient - KafkaConsumer sarama.Consumer - KafkaProducer sarama.SyncProducer + Config *config.Dendrite + AccountDB accounts.Database + KeyRing *gomatrixserverlib.KeyRing + Client *gomatrixserverlib.Client + FedClient *gomatrixserverlib.FederationClient AppserviceAPI appserviceAPI.AppServiceQueryAPI EDUInternalAPI eduServerAPI.EDUServerInputAPI @@ -61,7 +58,7 @@ type Monolith struct { // AddAllPublicRoutes attaches all public paths to the given router func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) { clientapi.AddPublicRoutes( - csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB, + csMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, @@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) syncapi.AddPublicRoutes( - csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, + csMux, m.UserAPI, m.RoomserverAPI, m.KeyAPI, m.FedClient, &m.Config.SyncAPI, ) } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 78420db1..6c54d2a0 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -15,10 +15,10 @@ package keyserver import ( - "github.com/Shopify/sarama" "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" @@ -36,8 +36,10 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer, + cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 4c138116..b2cc0728 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" + "github.com/matrix-org/dendrite/internal/setup/kafka" "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/sirupsen/logrus" @@ -41,6 +42,8 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer + _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) @@ -52,7 +55,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 2a03195c..1b692a09 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -17,7 +17,10 @@ import ( "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal" + "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) const ( @@ -160,7 +163,9 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro cfg.Defaults() cfg.Global.ServerName = testOrigin cfg.Global.Kafka.UseNaffka = true - cfg.RoomServer.Database.ConnectionString = config.DataSource(roomserverDBFileURI) + cfg.RoomServer.Database = config.DatabaseOptions{ + ConnectionString: roomserverDBFileURI, + } dp := &dummyProducer{ topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), } @@ -169,12 +174,17 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro t.Fatalf("failed to make caches: %s", err) } base := &setup.BaseDendrite{ - KafkaProducer: dp, - Caches: cache, - Cfg: cfg, + Caches: cache, + Cfg: cfg, } - - return NewInternalAPI(base, &test.NopJSONVerifier{}), dp + roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to room server db") + } + return internal.NewRoomserverAPI( + &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)), + base.Caches, &test.NopJSONVerifier{}, nil, + ), dp } func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []gomatrixserverlib.HeaderedEvent) { diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 43e2455b..de0bb434 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -17,11 +17,11 @@ package syncapi import ( "context" - "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/setup/kafka" keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -37,13 +37,14 @@ import ( // component. func AddPublicRoutes( router *mux.Router, - consumer sarama.Consumer, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db")