diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go index d20cf6d2..4675a773 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -28,28 +28,28 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -// OutputRoomEvent consumes events that originated in the room server. -type OutputRoomEvent struct { +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db *accounts.Database query api.RoomserverQueryAPI serverName string } -// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent( +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, store *accounts.Database, queryAPI api.RoomserverQueryAPI, -) *OutputRoomEvent { +) *OutputRoomEventConsumer { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: store, } - s := &OutputRoomEvent{ + s := &OutputRoomEventConsumer{ roomServerConsumer: &consumer, db: store, query: queryAPI, @@ -61,14 +61,14 @@ func NewOutputRoomEvent( } // Start consuming from room servers -func (s *OutputRoomEvent) Start() error { +func (s *OutputRoomEventConsumer) Start() error { return s.roomServerConsumer.Start() } // onMessage is called when the sync server receives a new event from the room server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -104,7 +104,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { } // lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEvent) lookupStateEvents( +func (s *OutputRoomEventConsumer) lookupStateEvents( addsStateEventIDs []string, event gomatrixserverlib.Event, ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index c53735aa..9fe8155c 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -114,7 +114,7 @@ func main() { }).Panic("Failed to setup kafka consumers") } - consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, accountDB, queryAPI) + consumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, accountDB, queryAPI) if err = consumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go index 0b5a73ee..d8c34c03 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go @@ -74,7 +74,7 @@ func main() { queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation) - consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, queues, db, queryAPI) + consumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, queues, db, queryAPI) if err = consumer.Start(); err != nil { log.WithError(err).Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 811c708a..5dc846aa 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -285,28 +285,28 @@ func (m *monolith) setupNotifiers() { func (m *monolith) setupConsumers() { var err error - clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent( + clientAPIConsumer := clientapi_consumers.NewOutputRoomEventConsumer( m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI, ) if err = clientAPIConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } - syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent( + syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEventConsumer( m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI, ) if err = syncAPIRoomConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } - syncAPIClientConsumer := syncapi_consumers.NewOutputClientData( + syncAPIClientConsumer := syncapi_consumers.NewOutputClientDataConsumer( m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, ) if err = syncAPIClientConsumer.Start(); err != nil { log.Panicf("startup: failed to start client API server consumer: %s", err) } - publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent( + publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEventConsumer( m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI, ) if err = publicRoomsAPIConsumer.Start(); err != nil { @@ -315,7 +315,7 @@ func (m *monolith) setupConsumers() { federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) - federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent( + federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEventConsumer( m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI, ) if err = federationSenderRoomConsumer.Start(); err != nil { diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go index 807aa0ca..30dd2ee9 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go @@ -73,7 +73,7 @@ func main() { }).Panic("Failed to setup kafka consumers") } - roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db, queryAPI) + roomConsumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, db, queryAPI) if err != nil { log.Panicf("startup: failed to create room server consumer: %s", err) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index ec6eebb6..ff210500 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -92,11 +92,11 @@ func main() { }).Panic("Failed to setup kafka consumers") } - roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, n, db, queryAPI) + roomConsumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, n, db, queryAPI) if err = roomConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } - clientConsumer := consumers.NewOutputClientData(cfg, kafkaConsumer, n, db) + clientConsumer := consumers.NewOutputClientDataConsumer(cfg, kafkaConsumer, n, db) if err = clientConsumer.Start(); err != nil { log.Panicf("startup: failed to start client API server consumer: %s", err) } diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index 04fc1265..3dc7159b 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -30,28 +30,28 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -// OutputRoomEvent consumes events that originated in the room server. -type OutputRoomEvent struct { +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db *storage.Database queues *queue.OutgoingQueues query api.RoomserverQueryAPI } -// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent( +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store *storage.Database, queryAPI api.RoomserverQueryAPI, -) *OutputRoomEvent { +) *OutputRoomEventConsumer { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: store, } - s := &OutputRoomEvent{ + s := &OutputRoomEventConsumer{ roomServerConsumer: &consumer, db: store, queues: queues, @@ -63,7 +63,7 @@ func NewOutputRoomEvent( } // Start consuming from room servers -func (s *OutputRoomEvent) Start() error { +func (s *OutputRoomEventConsumer) Start() error { return s.roomServerConsumer.Start() } @@ -71,7 +71,7 @@ func (s *OutputRoomEvent) Start() error { // It is unsafe to call this with messages for the same room in multiple gorountines // because updates it will likely fail with a types.EventIDMismatchError when it // realises that it cannot update the room state using the deltas. -func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -108,7 +108,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. -func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error { +func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) if err != nil { return err @@ -164,7 +164,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error { // Usually the list can be calculated locally, but sometimes it will need fetch // events from the room server. // Returns an error if there was a problem talking to the room server. -func (s *OutputRoomEvent) joinedHostsAtEvent( +func (s *OutputRoomEventConsumer) joinedHostsAtEvent( ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, ) ([]gomatrixserverlib.ServerName, error) { // Combine the delta into a single delta so that the adds and removes can @@ -283,7 +283,7 @@ func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []s } // lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEvent) lookupStateEvents( +func (s *OutputRoomEventConsumer) lookupStateEvents( addsStateEventIDs []string, event gomatrixserverlib.Event, ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go index 65209bf3..58ac3194 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -26,26 +26,26 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -// OutputRoomEvent consumes events that originated in the room server. -type OutputRoomEvent struct { +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db *storage.PublicRoomsServerDatabase query api.RoomserverQueryAPI } -// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent( +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, store *storage.PublicRoomsServerDatabase, queryAPI api.RoomserverQueryAPI, -) *OutputRoomEvent { +) *OutputRoomEventConsumer { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: store, } - s := &OutputRoomEvent{ + s := &OutputRoomEventConsumer{ roomServerConsumer: &consumer, db: store, query: queryAPI, @@ -56,12 +56,12 @@ func NewOutputRoomEvent( } // Start consuming from room servers -func (s *OutputRoomEvent) Start() error { +func (s *OutputRoomEventConsumer) Start() error { return s.roomServerConsumer.Start() } // onMessage is called when the sync server receives a new event from the room server output log. -func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go index 60ed01b6..18f1c1e5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -26,27 +26,27 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -// OutputClientData consumes events that originated in the client API server. -type OutputClientData struct { +// OutputClientDataConsumer consumes events that originated in the client API server. +type OutputClientDataConsumer struct { clientAPIConsumer *common.ContinualConsumer db *storage.SyncServerDatabase notifier *sync.Notifier } -// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. -func NewOutputClientData( +// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. +func NewOutputClientDataConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, n *sync.Notifier, store *storage.SyncServerDatabase, -) *OutputClientData { +) *OutputClientDataConsumer { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputClientData), Consumer: kafkaConsumer, PartitionStore: store, } - s := &OutputClientData{ + s := &OutputClientDataConsumer{ clientAPIConsumer: &consumer, db: store, notifier: n, @@ -57,14 +57,14 @@ func NewOutputClientData( } // Start consuming from room servers -func (s *OutputClientData) Start() error { +func (s *OutputClientDataConsumer) Start() error { return s.clientAPIConsumer.Start() } // onMessage is called when the sync server receives a new event from the client API server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output common.AccountData if err := json.Unmarshal(msg.Value, &output); err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 7c40247b..0a3e2153 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -30,29 +30,29 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -// OutputRoomEvent consumes events that originated in the room server. -type OutputRoomEvent struct { +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase notifier *sync.Notifier query api.RoomserverQueryAPI } -// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent( +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, n *sync.Notifier, store *storage.SyncServerDatabase, queryAPI api.RoomserverQueryAPI, -) *OutputRoomEvent { +) *OutputRoomEventConsumer { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: store, } - s := &OutputRoomEvent{ + s := &OutputRoomEventConsumer{ roomServerConsumer: &consumer, db: store, notifier: n, @@ -64,14 +64,14 @@ func NewOutputRoomEvent( } // Start consuming from room servers -func (s *OutputRoomEvent) Start() error { +func (s *OutputRoomEventConsumer) Start() error { return s.roomServerConsumer.Start() } // onMessage is called when the sync server receives a new event from the room server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent if err := json.Unmarshal(msg.Value, &output); err != nil { @@ -95,7 +95,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { } } -func (s *OutputRoomEvent) onNewRoomEvent( +func (s *OutputRoomEventConsumer) onNewRoomEvent( ctx context.Context, msg api.OutputNewRoomEvent, ) error { ev := msg.Event @@ -152,7 +152,7 @@ func (s *OutputRoomEvent) onNewRoomEvent( return nil } -func (s *OutputRoomEvent) onNewInviteEvent( +func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, ) error { syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event) @@ -168,7 +168,7 @@ func (s *OutputRoomEvent) onNewInviteEvent( return nil } -func (s *OutputRoomEvent) onRetireInviteEvent( +func (s *OutputRoomEventConsumer) onRetireInviteEvent( ctx context.Context, msg api.OutputRetireInviteEvent, ) error { err := s.db.RetireInviteEvent(ctx, msg.EventID) @@ -186,7 +186,7 @@ func (s *OutputRoomEvent) onRetireInviteEvent( } // lookupStateEvents looks up the state events that are added by a new event. -func (s *OutputRoomEvent) lookupStateEvents( +func (s *OutputRoomEventConsumer) lookupStateEvents( addsStateEventIDs []string, event gomatrixserverlib.Event, ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. @@ -242,7 +242,7 @@ func (s *OutputRoomEvent) lookupStateEvents( return result, nil } -func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) { +func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) { var stateKey string if event.StateKey() == nil { stateKey = ""