From 52eeeb16279497e24ed6b1e34a7a16fc69b587d1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 10 Aug 2020 15:18:37 +0100 Subject: [PATCH] Prefix-defined Kafka topics (#1254) * Prefix-defined Kafka topics * Fix current state server test --- appservice/consumers/roomserver.go | 2 +- build/gobind/monolith.go | 4 -- clientapi/clientapi.go | 2 +- cmd/dendrite-demo-libp2p/main.go | 5 -- cmd/dendrite-demo-yggdrasil/main.go | 5 -- cmd/dendritejs/main.go | 4 -- cmd/roomserver-integration-tests/main.go | 3 +- cmd/syncserver-integration-tests/main.go | 2 - currentstateserver/currentstateserver.go | 2 +- currentstateserver/currentstateserver_test.go | 6 ++- eduserver/eduserver.go | 5 +- federationsender/consumers/eduserver.go | 8 +-- federationsender/consumers/keychange.go | 2 +- federationsender/consumers/roomserver.go | 2 +- internal/config/config_global.go | 54 ------------------- internal/config/config_kafka.go | 52 ++++++++++++++++++ keyserver/keyserver.go | 2 +- roomserver/roomserver.go | 3 +- roomserver/roomserver_test.go | 2 +- syncapi/consumers/clientapi.go | 2 +- syncapi/consumers/eduserver_sendtodevice.go | 2 +- syncapi/consumers/eduserver_typing.go | 2 +- syncapi/consumers/roomserver.go | 2 +- syncapi/syncapi.go | 2 +- 24 files changed, 79 insertions(+), 96 deletions(-) create mode 100644 internal/config/config_kafka.go diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 84f2b9ab..3c9e52da 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -48,7 +48,7 @@ func NewOutputRoomEventConsumer( workerStates []types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Global.Kafka.Topics.OutputRoomEvent), + Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: appserviceDB, } diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 9c3880b1..e4373490 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -88,10 +88,6 @@ func (m *DendriteMonolith) Start() { cfg.Global.PrivateKey = ygg.SigningPrivateKey() cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Topics.OutputRoomEvent = "roomserverOutput" - cfg.Global.Kafka.Topics.OutputClientData = "clientapiOutput" - cfg.Global.Kafka.Topics.OutputTypingEvent = "typingServerOutput" - cfg.Global.Kafka.Topics.OutputSendToDeviceEvent = "sendToDeviceOutput" cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-naffka.db", m.StorageDirectory)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-account.db", m.StorageDirectory)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-device.db", m.StorageDirectory)) diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index f3789521..1a4307c1 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -54,7 +54,7 @@ func AddPublicRoutes( ) { syncProducer := &producers.SyncAPIProducer{ Producer: producer, - Topic: string(cfg.Matrix.Kafka.Topics.OutputClientData), + Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), } routing.Setup( diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 8c28014a..e3035547 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -118,11 +118,6 @@ func main() { cfg.Global.PrivateKey = privKey cfg.Global.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName)) cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Topics.OutputRoomEvent = "roomserverOutput" - cfg.Global.Kafka.Topics.OutputClientData = "clientapiOutput" - cfg.Global.Kafka.Topics.OutputTypingEvent = "typingServerOutput" - cfg.Global.Kafka.Topics.OutputSendToDeviceEvent = "sendToDeviceOutput" - cfg.Global.Kafka.Topics.OutputKeyChangeEvent = "keyChangeOutput" cfg.FederationSender.FederationMaxRetries = 6 cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index cf4c7735..33ed1542 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -73,11 +73,6 @@ func main() { cfg.Global.PrivateKey = ygg.SigningPrivateKey() cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Global.Kafka.UseNaffka = true - cfg.Global.Kafka.Topics.OutputRoomEvent = "roomserverOutput" - cfg.Global.Kafka.Topics.OutputClientData = "clientapiOutput" - cfg.Global.Kafka.Topics.OutputTypingEvent = "typingServerOutput" - cfg.Global.Kafka.Topics.OutputSendToDeviceEvent = "sendToDeviceOutput" - cfg.Global.Kafka.Topics.OutputKeyChangeEvent = "keyChangeOutput" cfg.FederationSender.FederationMaxRetries = 8 cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index ce7812fa..f350a938 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -174,10 +174,6 @@ func main() { cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db" cfg.Global.Kafka.UseNaffka = true cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db" - cfg.Global.Kafka.Topics.OutputTypingEvent = "output_typing_event" - cfg.Global.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event" - cfg.Global.Kafka.Topics.OutputClientData = "output_client_data" - cfg.Global.Kafka.Topics.OutputRoomEvent = "output_room_event" cfg.Global.TrustedIDServers = []string{ "matrix.org", "vector.im", } diff --git a/cmd/roomserver-integration-tests/main.go b/cmd/roomserver-integration-tests/main.go index 4d3095be..a9c88ae1 100644 --- a/cmd/roomserver-integration-tests/main.go +++ b/cmd/roomserver-integration-tests/main.go @@ -29,6 +29,7 @@ import ( "net/http" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/inthttp" @@ -240,7 +241,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R panic(err) } - outputTopic := string(cfg.Global.Kafka.Topics.OutputRoomEvent) + outputTopic := cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent) err = exe.DeleteTopic(outputTopic) if err != nil { diff --git a/cmd/syncserver-integration-tests/main.go b/cmd/syncserver-integration-tests/main.go index 76fa0eba..bba8fd96 100644 --- a/cmd/syncserver-integration-tests/main.go +++ b/cmd/syncserver-integration-tests/main.go @@ -134,8 +134,6 @@ func startSyncServer() (*exec.Cmd, chan error) { // TODO use the address assigned by the config generator rather than clobbering. cfg.Global.ServerName = "localhost" cfg.SyncAPI.Listen = config.Address(syncserverAddr) - cfg.Global.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic) - cfg.Global.Kafka.Topics.OutputClientData = config.Topic(clientTopic) if err := test.WriteConfig(cfg, dir); err != nil { panic(err) diff --git a/currentstateserver/currentstateserver.go b/currentstateserver/currentstateserver.go index 8e985e84..f0dd4b88 100644 --- a/currentstateserver/currentstateserver.go +++ b/currentstateserver/currentstateserver.go @@ -40,7 +40,7 @@ func NewInternalAPI(cfg *config.CurrentStateServer, consumer sarama.Consumer) ap logrus.WithError(err).Panicf("failed to open database") } roomConsumer := consumers.NewOutputRoomEventConsumer( - string(cfg.Matrix.Kafka.Topics.OutputRoomEvent), consumer, csDB, + cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent), consumer, csDB, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") diff --git a/currentstateserver/currentstateserver_test.go b/currentstateserver/currentstateserver_test.go index 2b0e40c2..bb5b40ff 100644 --- a/currentstateserver/currentstateserver_test.go +++ b/currentstateserver/currentstateserver_test.go @@ -19,6 +19,7 @@ import ( "context" "crypto/ed25519" "encoding/json" + "fmt" "net/http" "os" "reflect" @@ -54,7 +55,8 @@ var ( testEvents = []gomatrixserverlib.HeaderedEvent{} testStateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent) - kafkaTopic = "room_events" + kafkaPrefix = "Dendrite" + kafkaTopic = fmt.Sprintf("%s%s", kafkaPrefix, "OutputRoomEvent") ) func init() { @@ -99,8 +101,8 @@ func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, sarama.Sync stateDBName := "test_state.db" naffkaDBName := "test_naffka.db" cfg.Global.ServerName = "kaer.morhen" - cfg.Global.Kafka.Topics.OutputRoomEvent = config.Topic(kafkaTopic) cfg.CurrentStateServer.Database.ConnectionString = config.DataSource("file:" + stateDBName) + cfg.Global.Kafka.TopicPrefix = kafkaPrefix db, err := sqlutil.Open(&config.DatabaseOptions{ ConnectionString: config.DataSource("file:" + naffkaDBName), }) diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index e0e61b1a..b6196c26 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -22,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/input" "github.com/matrix-org/dendrite/eduserver/inthttp" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -44,8 +45,8 @@ func NewInternalAPI( Cache: eduCache, UserAPI: userAPI, Producer: base.KafkaProducer, - OutputTypingEventTopic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent), - OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent), + OutputTypingEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), + OutputSendToDeviceEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), ServerName: cfg.Matrix.ServerName, } } diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 74ce65db..e1a42f07 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -50,20 +50,20 @@ func NewOutputEDUConsumer( ) *OutputEDUConsumer { c := &OutputEDUConsumer{ typingConsumer: &internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, sendToDeviceConsumer: &internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - TypingTopic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent), - SendToDeviceTopic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent), + TypingTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), + SendToDeviceTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), } c.typingConsumer.ProcessMessage = c.onTypingEvent c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index 8060125e..c1136f10 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -49,7 +49,7 @@ func NewKeyChangeConsumer( ) *KeyChangeConsumer { c := &KeyChangeConsumer{ consumer: &internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index b3a4cde3..e09350f8 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -49,7 +49,7 @@ func NewOutputRoomEventConsumer( rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/internal/config/config_global.go b/internal/config/config_global.go index 9456dd3f..785a8033 100644 --- a/internal/config/config_global.go +++ b/internal/config/config_global.go @@ -60,60 +60,6 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { c.Metrics.Verify(configErrs, isMonolith) } -type Kafka struct { - // A list of kafka addresses to connect to. - Addresses []string `yaml:"addresses"` - // Whether to use naffka instead of kafka. - // Naffka can only be used when running dendrite as a single monolithic server. - // Kafka can be used both with a monolithic server and when running the - // components as separate servers. - UseNaffka bool `yaml:"use_naffka"` - // The Naffka database is used internally by the naffka library, if used. - Database DatabaseOptions `yaml:"naffka_database"` - // The names of the topics to use when reading and writing from kafka. - Topics struct { - // Topic for roomserver/api.OutputRoomEvent events. - OutputRoomEvent Topic `yaml:"output_room_event"` - // Topic for sending account data from client API to sync API - OutputClientData Topic `yaml:"output_client_data"` - // Topic for eduserver/api.OutputTypingEvent events. - OutputTypingEvent Topic `yaml:"output_typing_event"` - // Topic for eduserver/api.OutputSendToDeviceEvent events. - OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"` - // Topic for keyserver when new device keys are added. - OutputKeyChangeEvent Topic `yaml:"output_key_change_event"` - } -} - -func (c *Kafka) Defaults() { - c.UseNaffka = true - c.Database.Defaults() - c.Database.ConnectionString = DataSource("file:naffka.db") - c.Topics.OutputRoomEvent = "OutputRoomEventTopic" - c.Topics.OutputClientData = "OutputClientDataTopic" - c.Topics.OutputTypingEvent = "OutputTypingEventTopic" - c.Topics.OutputSendToDeviceEvent = "OutputSendToDeviceEventTopic" - c.Topics.OutputKeyChangeEvent = "OutputKeyChangeEventTopic" -} - -func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { - if c.UseNaffka { - if !isMonolith { - configErrs.Add("naffka can only be used in a monolithic server") - } - checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) - } else { - // If we aren't using naffka then we need to have at least one kafka - // server to talk to. - checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) - } - checkNotEmpty(configErrs, "global.kafka.topics.output_room_event", string(c.Topics.OutputRoomEvent)) - checkNotEmpty(configErrs, "global.kafka.topics.output_client_data", string(c.Topics.OutputClientData)) - checkNotEmpty(configErrs, "global.kafka.topics.output_typing_event", string(c.Topics.OutputTypingEvent)) - checkNotEmpty(configErrs, "global.kafka.topics.output_send_to_device_event", string(c.Topics.OutputSendToDeviceEvent)) - checkNotEmpty(configErrs, "global.kafka.topics.output_key_change_event", string(c.Topics.OutputKeyChangeEvent)) -} - // The configuration to use for Prometheus metrics type Metrics struct { // Whether or not the metrics are enabled diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go new file mode 100644 index 00000000..43a27cf2 --- /dev/null +++ b/internal/config/config_kafka.go @@ -0,0 +1,52 @@ +package config + +import "fmt" + +// Defined Kafka topics. +const ( + TopicOutputTypingEvent = "OutputTypingEvent" + TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" + TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" + TopicOutputRoomEvent = "OutputRoomEvent" + TopicOutputClientData = "OutputClientData" +) + +type Kafka struct { + // A list of kafka addresses to connect to. + Addresses []string `yaml:"addresses"` + // Whether to use naffka instead of kafka. + // Naffka can only be used when running dendrite as a single monolithic server. + // Kafka can be used both with a monolithic server and when running the + // components as separate servers. + UseNaffka bool `yaml:"use_naffka"` + // The Naffka database is used internally by the naffka library, if used. + Database DatabaseOptions `yaml:"naffka_database"` + // The prefix to use for Kafka topic names for this homeserver - really only + // useful if running more than one Dendrite on the same Kafka deployment. + TopicPrefix string `yaml:"topic_prefix"` +} + +func (k *Kafka) TopicFor(name string) string { + return fmt.Sprintf("%s%s", k.TopicPrefix, name) +} + +func (c *Kafka) Defaults() { + c.UseNaffka = true + c.Database.Defaults() + c.Database.ConnectionString = DataSource("file:naffka.db") + c.TopicPrefix = "Dendrite" +} + +func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { + if c.UseNaffka { + if !isMonolith { + configErrs.Add("naffka can only be used in a monolithic server") + } + checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) + } else { + // If we aren't using naffka then we need to have at least one kafka + // server to talk to. + checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) + } + checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) +} diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index b5bbb519..04136938 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -43,7 +43,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), Producer: producer, DB: db, } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 1c226085..1f331a8f 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/inthttp" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" @@ -50,7 +51,7 @@ func NewInternalAPI( DB: roomserverDB, Cfg: cfg, Producer: base.KafkaProducer, - OutputRoomEventTopic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent), + OutputRoomEventTopic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), Cache: base.Caches, ServerName: cfg.Matrix.ServerName, FedClient: fedClient, diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index d11c63b2..bcd9afb3 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -100,7 +100,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js cfg.Global.Kafka.UseNaffka = true cfg.RoomServer.Database.ConnectionString = config.DataSource(roomserverDBFileURI) dp := &dummyProducer{ - topic: string(cfg.Global.Kafka.Topics.OutputRoomEvent), + topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), } cache, err := caching.NewInMemoryLRUCache(true) if err != nil { diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index ceaa735a..6a1e590a 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -44,7 +44,7 @@ func NewOutputClientDataConsumer( ) *OutputClientDataConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputClientData), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 20dd1756..90bfe3e5 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -48,7 +48,7 @@ func NewOutputSendToDeviceEventConsumer( ) *OutputSendToDeviceEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index fc5703d3..523728cd 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -44,7 +44,7 @@ func NewOutputTypingEventConsumer( ) *OutputTypingEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 06c904c3..1af58837 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -49,7 +49,7 @@ func NewOutputRoomEventConsumer( ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent), + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 9caed7be..0f4ea828 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -65,7 +65,7 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent), + cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), consumer, notifier, keyAPI, currentStateAPI, syncDB, ) if err = keyChangeConsumer.Start(); err != nil {