diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 987815c2..90db9eea 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -51,11 +51,6 @@ func AddPublicRoutes( roomserverProducer := producers.NewRoomserverProducer(rsAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI) - userUpdateProducer := &producers.UserUpdateProducer{ - Producer: producer, - Topic: string(cfg.Kafka.Topics.UserUpdates), - } - syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: string(cfg.Kafka.Topics.OutputClientData), @@ -70,7 +65,7 @@ func AddPublicRoutes( routing.Setup( router, cfg, roomserverProducer, rsAPI, asAPI, - accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, + accountsDB, deviceDB, federation, *keyRing, syncProducer, eduProducer, transactionsCache, fsAPI, ) } diff --git a/clientapi/producers/userupdate.go b/clientapi/producers/userupdate.go deleted file mode 100644 index 02e1700f..00000000 --- a/clientapi/producers/userupdate.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package producers - -import ( - "encoding/json" - - "github.com/Shopify/sarama" -) - -// UserUpdateProducer produces events related to user updates. -type UserUpdateProducer struct { - Topic string - Producer sarama.SyncProducer -} - -// TODO: Move this struct to `internal` so the components that consume the topic -// can use it when parsing incoming messages -type profileUpdate struct { - Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`) - OldValue string `json:"old_value"` // The attribute's value before the update - NewValue string `json:"new_value"` // The attribute's value after the update -} - -// SendUpdate sends an update using kafka to notify the roomserver of the -// profile update. Returns an error if the update failed to send. -func (p *UserUpdateProducer) SendUpdate( - userID string, updatedAttribute string, oldValue string, newValue string, -) error { - var update profileUpdate - var m sarama.ProducerMessage - - m.Topic = string(p.Topic) - m.Key = sarama.StringEncoder(userID) - - update = profileUpdate{ - Updated: updatedAttribute, - OldValue: oldValue, - NewValue: newValue, - } - - value, err := json.Marshal(update) - if err != nil { - return err - } - m.Value = sarama.ByteEncoder(value) - - _, _, err = p.Producer.SendMessage(&m) - return err -} diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index dbf6ef1d..c0fe32a3 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -94,7 +94,7 @@ func GetAvatarURL( // nolint:gocyclo func SetAvatarURL( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite, + userID string, cfg *config.Dendrite, rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { @@ -104,8 +104,6 @@ func SetAvatarURL( } } - changedKey := "avatar_url" - var r internal.AvatarURL if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { return *resErr @@ -174,11 +172,6 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if err := producer.SendUpdate(userID, changedKey, oldProfile.AvatarURL, r.AvatarURL); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendUpdate failed") - return jsonerror.InternalServerError() - } - return util.JSONResponse{ Code: http.StatusOK, JSON: struct{}{}, @@ -216,7 +209,7 @@ func GetDisplayName( // nolint:gocyclo func SetDisplayName( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite, + userID string, cfg *config.Dendrite, rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { @@ -226,8 +219,6 @@ func SetDisplayName( } } - changedKey := "displayname" - var r internal.DisplayName if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { return *resErr @@ -296,11 +287,6 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if err := producer.SendUpdate(userID, changedKey, oldProfile.DisplayName, r.DisplayName); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendUpdate failed") - return jsonerror.InternalServerError() - } - return util.JSONResponse{ Code: http.StatusOK, JSON: struct{}{}, diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 83e399ac..eb558205 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -55,7 +55,6 @@ func Setup( deviceDB devices.Database, federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, - userUpdateProducer *producers.UserUpdateProducer, syncProducer *producers.SyncAPIProducer, eduProducer *producers.EDUServerProducer, transactionsCache *transactions.Cache, @@ -387,7 +386,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, cfg, producer, rsAPI) + return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -409,7 +408,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, cfg, producer, rsAPI) + return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 377e017e..ba1af148 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -109,7 +109,6 @@ func main() { cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" cfg.Kafka.Topics.OutputClientData = "clientapiOutput" cfg.Kafka.Topics.OutputTypingEvent = "typingServerOutput" - cfg.Kafka.Topics.UserUpdates = "userUpdates" cfg.Database.Account = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.Database.Device = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) cfg.Database.MediaAPI = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 9901c6e5..86ae3368 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -169,7 +169,6 @@ func main() { cfg.Database.RoomServer = "file:/idb/dendritejs_roomserver.db" cfg.Database.ServerKey = "file:/idb/dendritejs_serverkey.db" cfg.Database.SyncAPI = "file:/idb/dendritejs_syncapi.db" - cfg.Kafka.Topics.UserUpdates = "user_updates" cfg.Kafka.Topics.OutputTypingEvent = "output_typing_event" cfg.Kafka.Topics.OutputSendToDeviceEvent = "output_send_to_device_event" cfg.Kafka.Topics.OutputClientData = "output_client_data" diff --git a/internal/config/config.go b/internal/config/config.go index 858646ab..bff4945b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -154,8 +154,6 @@ type Dendrite struct { OutputTypingEvent Topic `yaml:"output_typing_event"` // Topic for eduserver/api.OutputSendToDeviceEvent events. OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"` - // Topic for user updates (profile, presence) - UserUpdates Topic `yaml:"user_updates"` } } `yaml:"kafka"` @@ -591,7 +589,6 @@ func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) { checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent)) - checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates)) } // checkDatabase verifies the parameters database.* are valid. diff --git a/internal/test/config.go b/internal/test/config.go index 06510c8b..951f65a1 100644 --- a/internal/test/config.go +++ b/internal/test/config.go @@ -84,7 +84,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output" - cfg.Kafka.Topics.UserUpdates = "test.user.output" // TODO: Use different databases for the different schemas. // Using the same database for every schema currently works because