Notify profile update (#153)
* Profile retrieval * Saving avatar (without propagating it) * Saving display name (without propagating it) * Getters for display name and avatar URL * Doc'd * Introduced new Kafka topic and producer * Updated config with new kafka topic * Switched to samara producer and now sending messages * Doc'd * Put kafka update after the database insert * Doc'd profileUpdate structure * Removed unused parameter * Moved user updates producer to clientapi/producersmain
parent
1efbad8119
commit
355ab5eedf
|
@ -56,6 +56,7 @@ kafka:
|
||||||
topics:
|
topics:
|
||||||
input_room_event: roomserverInput
|
input_room_event: roomserverInput
|
||||||
output_room_event: roomserverOutput
|
output_room_event: roomserverOutput
|
||||||
|
user_updates: userUpdates
|
||||||
|
|
||||||
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
||||||
database:
|
database:
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package producers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// UserUpdateProducer produces events related to user updates.
|
||||||
|
type UserUpdateProducer struct {
|
||||||
|
Topic string
|
||||||
|
Producer sarama.SyncProducer
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Move this struct to `common` so the components that consume the topic
|
||||||
|
// can use it when parsing incoming messages
|
||||||
|
type profileUpdate struct {
|
||||||
|
Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`)
|
||||||
|
OldValue string `json:"old_value"` // The attribute's value before the update
|
||||||
|
NewValue string `json:"new_value"` // The attribute's value after the update
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUserUpdateProducer creates a new UserUpdateProducer
|
||||||
|
func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) {
|
||||||
|
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &UserUpdateProducer{
|
||||||
|
Topic: topic,
|
||||||
|
Producer: producer,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendUpdate sends an update using kafka to notify the roomserver of the
|
||||||
|
// profile update. Returns an error if the update failed to send.
|
||||||
|
func (p *UserUpdateProducer) SendUpdate(
|
||||||
|
userID string, updatedAttribute string, oldValue string, newValue string,
|
||||||
|
) error {
|
||||||
|
var update profileUpdate
|
||||||
|
var m sarama.ProducerMessage
|
||||||
|
|
||||||
|
m.Topic = string(p.Topic)
|
||||||
|
m.Key = sarama.StringEncoder(userID)
|
||||||
|
|
||||||
|
update = profileUpdate{
|
||||||
|
Updated: updatedAttribute,
|
||||||
|
OldValue: oldValue,
|
||||||
|
NewValue: newValue,
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err := json.Marshal(update)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.Value = sarama.ByteEncoder(value)
|
||||||
|
|
||||||
|
if _, _, err := p.Producer.SendMessage(&m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -22,6 +22,8 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
|
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -84,6 +86,7 @@ func GetAvatarURL(
|
||||||
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
||||||
func SetAvatarURL(
|
func SetAvatarURL(
|
||||||
req *http.Request, accountDB *accounts.Database, userID string,
|
req *http.Request, accountDB *accounts.Database, userID string,
|
||||||
|
producer *producers.UserUpdateProducer,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
var r avatarURL
|
var r avatarURL
|
||||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||||
|
@ -97,9 +100,20 @@ func SetAvatarURL(
|
||||||
}
|
}
|
||||||
|
|
||||||
localpart := getLocalPart(userID)
|
localpart := getLocalPart(userID)
|
||||||
|
|
||||||
|
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||||
|
if err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
|
if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := producer.SendUpdate(userID, "avatar_url", oldProfile.AvatarURL, r.AvatarURL); err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
|
@ -127,6 +141,7 @@ func GetDisplayName(
|
||||||
// SetDisplayName implements PUT /profile/{userID}/displayname
|
// SetDisplayName implements PUT /profile/{userID}/displayname
|
||||||
func SetDisplayName(
|
func SetDisplayName(
|
||||||
req *http.Request, accountDB *accounts.Database, userID string,
|
req *http.Request, accountDB *accounts.Database, userID string,
|
||||||
|
producer *producers.UserUpdateProducer,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
var r displayName
|
var r displayName
|
||||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||||
|
@ -140,9 +155,20 @@ func SetDisplayName(
|
||||||
}
|
}
|
||||||
|
|
||||||
localpart := getLocalPart(userID)
|
localpart := getLocalPart(userID)
|
||||||
|
|
||||||
|
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||||
|
if err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
|
if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := producer.SendUpdate(userID, "displayname", oldProfile.DisplayName, r.DisplayName); err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
|
|
|
@ -46,6 +46,7 @@ func Setup(
|
||||||
deviceDB *devices.Database,
|
deviceDB *devices.Database,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
keyRing gomatrixserverlib.KeyRing,
|
keyRing gomatrixserverlib.KeyRing,
|
||||||
|
userUpdateProducer *producers.UserUpdateProducer,
|
||||||
) {
|
) {
|
||||||
apiMux := mux.NewRouter()
|
apiMux := mux.NewRouter()
|
||||||
|
|
||||||
|
@ -178,7 +179,7 @@ func Setup(
|
||||||
r0mux.Handle("/profile/{userID}/avatar_url",
|
r0mux.Handle("/profile/{userID}/avatar_url",
|
||||||
common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
return readers.SetAvatarURL(req, accountDB, vars["userID"])
|
return readers.SetAvatarURL(req, accountDB, vars["userID"], userUpdateProducer)
|
||||||
}),
|
}),
|
||||||
).Methods("PUT", "OPTIONS")
|
).Methods("PUT", "OPTIONS")
|
||||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||||
|
@ -194,7 +195,7 @@ func Setup(
|
||||||
r0mux.Handle("/profile/{userID}/displayname",
|
r0mux.Handle("/profile/{userID}/displayname",
|
||||||
common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
return readers.SetDisplayName(req, accountDB, vars["userID"])
|
return readers.SetDisplayName(req, accountDB, vars["userID"], userUpdateProducer)
|
||||||
}),
|
}),
|
||||||
).Methods("PUT", "OPTIONS")
|
).Methods("PUT", "OPTIONS")
|
||||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||||
|
|
|
@ -53,6 +53,9 @@ func main() {
|
||||||
roomserverProducer, err := producers.NewRoomserverProducer(
|
roomserverProducer, err := producers.NewRoomserverProducer(
|
||||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
|
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
|
||||||
)
|
)
|
||||||
|
userUpdateProducer, err := producers.NewUserUpdateProducer(
|
||||||
|
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
||||||
}
|
}
|
||||||
|
@ -86,7 +89,7 @@ func main() {
|
||||||
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
|
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
|
||||||
routing.Setup(
|
routing.Setup(
|
||||||
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
||||||
queryAPI, accountDB, deviceDB, federation, keyRing,
|
queryAPI, accountDB, deviceDB, federation, keyRing, userUpdateProducer,
|
||||||
)
|
)
|
||||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,8 @@ type Dendrite struct {
|
||||||
InputRoomEvent Topic `yaml:"input_room_event"`
|
InputRoomEvent Topic `yaml:"input_room_event"`
|
||||||
// Topic for roomserver/api.OutputRoomEvent events.
|
// Topic for roomserver/api.OutputRoomEvent events.
|
||||||
OutputRoomEvent Topic `yaml:"output_room_event"`
|
OutputRoomEvent Topic `yaml:"output_room_event"`
|
||||||
|
// Topic for user updates (profile, presence)
|
||||||
|
UserUpdates Topic `yaml:"user_updates"`
|
||||||
}
|
}
|
||||||
} `yaml:"kafka"`
|
} `yaml:"kafka"`
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue