Summarise key change logs (#1278)
parent
02a8515e99
commit
e571e196ce
|
@ -42,6 +42,7 @@ func (p *KeyChange) DefaultPartition() int32 {
|
|||
|
||||
// ProduceKeyChanges creates new change events for each key
|
||||
func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
|
||||
userToDeviceCount := make(map[string]int)
|
||||
for _, key := range keys {
|
||||
var m sarama.ProducerMessage
|
||||
|
||||
|
@ -62,12 +63,12 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userToDeviceCount[key.UserID]++
|
||||
}
|
||||
for userID, count := range userToDeviceCount {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"user_id": key.UserID,
|
||||
"device_id": key.DeviceID,
|
||||
"partition": partition,
|
||||
"offset": offset,
|
||||
"len_key_bytes": len(key.KeyJSON),
|
||||
"user_id": userID,
|
||||
"num_key_changes": count,
|
||||
}).Infof("Produced to key change topic '%s'", p.Topic)
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue