Make account data sync incremental (#170)
* Clean roomserver consumer * Make account data sync incremental * Use a different name for the sync AD table * Improved error logging * Created missing topic in tests * Add client API topic to tests * Add client API topic to common * Move data batch retrieval * Add database index for data retrieval * Fix typo in table name * Fix indentationmain
parent
7d17df6f51
commit
0fbb8b7824
|
@ -56,6 +56,7 @@ kafka:
|
||||||
topics:
|
topics:
|
||||||
input_room_event: roomserverInput
|
input_room_event: roomserverInput
|
||||||
output_room_event: roomserverOutput
|
output_room_event: roomserverOutput
|
||||||
|
output_client_data: clientapiOutput
|
||||||
user_updates: userUpdates
|
user_updates: userUpdates
|
||||||
|
|
||||||
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
# The postgres connection configs for connecting to the databases e.g a postgres:// URI
|
||||||
|
|
|
@ -44,12 +44,16 @@ const insertAccountDataSQL = `
|
||||||
const selectAccountDataSQL = "" +
|
const selectAccountDataSQL = "" +
|
||||||
"SELECT room_id, type, content FROM account_data WHERE localpart = $1"
|
"SELECT room_id, type, content FROM account_data WHERE localpart = $1"
|
||||||
|
|
||||||
|
const selectAccountDataByTypeSQL = "" +
|
||||||
|
"SELECT content FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3"
|
||||||
|
|
||||||
const deleteAccountDataSQL = "" +
|
const deleteAccountDataSQL = "" +
|
||||||
"DELETE FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3"
|
"DELETE FROM account_data WHERE localpart = $1 AND room_id = $2 AND type = $3"
|
||||||
|
|
||||||
type accountDataStatements struct {
|
type accountDataStatements struct {
|
||||||
insertAccountDataStmt *sql.Stmt
|
insertAccountDataStmt *sql.Stmt
|
||||||
selectAccountDataStmt *sql.Stmt
|
selectAccountDataStmt *sql.Stmt
|
||||||
|
selectAccountDataByTypeStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
@ -63,6 +67,9 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectAccountDataStmt, err = db.Prepare(selectAccountDataSQL); err != nil {
|
if s.selectAccountDataStmt, err = db.Prepare(selectAccountDataSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectAccountDataByTypeStmt, err = db.Prepare(selectAccountDataByTypeSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,3 +114,31 @@ func (s *accountDataStatements) selectAccountData(localpart string) (
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *accountDataStatements) selectAccountDataByType(
|
||||||
|
localpart string, roomID string, dataType string,
|
||||||
|
) (data []gomatrixserverlib.ClientEvent, err error) {
|
||||||
|
data = []gomatrixserverlib.ClientEvent{}
|
||||||
|
|
||||||
|
rows, err := s.selectAccountDataByTypeStmt.Query(localpart, roomID, dataType)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var content []byte
|
||||||
|
|
||||||
|
if err = rows.Scan(&content); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ac := gomatrixserverlib.ClientEvent{
|
||||||
|
Type: dataType,
|
||||||
|
Content: content,
|
||||||
|
}
|
||||||
|
|
||||||
|
data = append(data, ac)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -224,6 +224,14 @@ func (d *Database) GetAccountData(localpart string) (
|
||||||
return d.accountDatas.selectAccountData(localpart)
|
return d.accountDatas.selectAccountData(localpart)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAccountDataByType returns account data matching a given
|
||||||
|
// localpart, room ID and type.
|
||||||
|
// If no account data could be found, returns an empty array
|
||||||
|
// Returns an error if there was an issue with the retrieval
|
||||||
|
func (d *Database) GetAccountDataByType(localpart string, roomID string, dataType string) (data []gomatrixserverlib.ClientEvent, err error) {
|
||||||
|
return d.accountDatas.selectAccountDataByType(localpart, roomID, dataType)
|
||||||
|
}
|
||||||
|
|
||||||
func hashPassword(plaintext string) (hash string, err error) {
|
func hashPassword(plaintext string) (hash string, err error) {
|
||||||
hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost)
|
hashBytes, err := bcrypt.GenerateFromPassword([]byte(plaintext), bcrypt.DefaultCost)
|
||||||
return string(hashBytes), err
|
return string(hashBytes), err
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package producers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SyncAPIProducer produces events for the sync API server to consume
|
||||||
|
type SyncAPIProducer struct {
|
||||||
|
Topic string
|
||||||
|
Producer sarama.SyncProducer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSyncAPIProducer creates a new SyncAPIProducer
|
||||||
|
func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) {
|
||||||
|
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &SyncAPIProducer{
|
||||||
|
Topic: topic,
|
||||||
|
Producer: producer,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendData sends account data to the sync API server
|
||||||
|
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
|
||||||
|
var m sarama.ProducerMessage
|
||||||
|
|
||||||
|
data := common.AccountData{
|
||||||
|
RoomID: roomID,
|
||||||
|
Type: dataType,
|
||||||
|
}
|
||||||
|
value, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Topic = string(p.Topic)
|
||||||
|
m.Key = sarama.StringEncoder(userID)
|
||||||
|
m.Value = sarama.ByteEncoder(value)
|
||||||
|
|
||||||
|
if _, _, err := p.Producer.SendMessage(&m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
@ -30,7 +31,7 @@ import (
|
||||||
// SaveAccountData implements PUT /user/{userId}/[rooms/{roomId}/]account_data/{type}
|
// SaveAccountData implements PUT /user/{userId}/[rooms/{roomId}/]account_data/{type}
|
||||||
func SaveAccountData(
|
func SaveAccountData(
|
||||||
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
|
||||||
userID string, roomID string, dataType string,
|
userID string, roomID string, dataType string, syncProducer *producers.SyncAPIProducer,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
if req.Method != "PUT" {
|
if req.Method != "PUT" {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
@ -62,6 +63,10 @@ func SaveAccountData(
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := syncProducer.SendData(userID, roomID, dataType); err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
|
|
|
@ -48,6 +48,7 @@ func Setup(
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
keyRing gomatrixserverlib.KeyRing,
|
keyRing gomatrixserverlib.KeyRing,
|
||||||
userUpdateProducer *producers.UserUpdateProducer,
|
userUpdateProducer *producers.UserUpdateProducer,
|
||||||
|
syncProducer *producers.SyncAPIProducer,
|
||||||
) {
|
) {
|
||||||
apiMux := mux.NewRouter()
|
apiMux := mux.NewRouter()
|
||||||
|
|
||||||
|
@ -291,14 +292,14 @@ func Setup(
|
||||||
r0mux.Handle("/user/{userID}/account_data/{type}",
|
r0mux.Handle("/user/{userID}/account_data/{type}",
|
||||||
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"])
|
return readers.SaveAccountData(req, accountDB, device, vars["userID"], "", vars["type"], syncProducer)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
r0mux.Handle("/user/{userID}/rooms/{roomID}/account_data/{type}",
|
r0mux.Handle("/user/{userID}/rooms/{roomID}/account_data/{type}",
|
||||||
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
common.MakeAuthAPI("user_account_data", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
vars := mux.Vars(req)
|
vars := mux.Vars(req)
|
||||||
return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"])
|
return readers.SaveAccountData(req, accountDB, device, vars["userID"], vars["roomID"], vars["type"], syncProducer)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,12 @@ func main() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
||||||
}
|
}
|
||||||
|
syncProducer, err := producers.NewSyncAPIProducer(
|
||||||
|
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
||||||
|
}
|
||||||
|
|
||||||
federation := gomatrixserverlib.NewFederationClient(
|
federation := gomatrixserverlib.NewFederationClient(
|
||||||
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||||
|
@ -99,7 +105,7 @@ func main() {
|
||||||
routing.Setup(
|
routing.Setup(
|
||||||
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
||||||
queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing,
|
queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing,
|
||||||
userUpdateProducer,
|
userUpdateProducer, syncProducer,
|
||||||
)
|
)
|
||||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,12 +73,19 @@ func main() {
|
||||||
if err = n.Load(db); err != nil {
|
if err = n.Load(db); err != nil {
|
||||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||||
}
|
}
|
||||||
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
if err = consumer.Start(); err != nil {
|
if err = roomConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer")
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
|
}
|
||||||
|
clientConsumer, err := consumers.NewOutputClientData(cfg, n, db)
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("startup: failed to create client API server consumer: %s", err)
|
||||||
|
}
|
||||||
|
if err = clientConsumer.Start(); err != nil {
|
||||||
|
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Starting sync server on ", cfg.Listen.SyncAPI)
|
log.Info("Starting sync server on ", cfg.Listen.SyncAPI)
|
||||||
|
|
|
@ -54,6 +54,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
const inputTopic = "syncserverInput"
|
const inputTopic = "syncserverInput"
|
||||||
|
const clientTopic = "clientapiserverOutput"
|
||||||
|
|
||||||
var exe = test.KafkaExecutor{
|
var exe = test.KafkaExecutor{
|
||||||
ZookeeperURI: zookeeperURI,
|
ZookeeperURI: zookeeperURI,
|
||||||
|
@ -134,6 +135,7 @@ func startSyncServer() (*exec.Cmd, chan error) {
|
||||||
cfg.Matrix.ServerName = "localhost"
|
cfg.Matrix.ServerName = "localhost"
|
||||||
cfg.Listen.SyncAPI = config.Address(syncserverAddr)
|
cfg.Listen.SyncAPI = config.Address(syncserverAddr)
|
||||||
cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic)
|
cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic)
|
||||||
|
cfg.Kafka.Topics.OutputClientData = config.Topic(clientTopic)
|
||||||
|
|
||||||
if err := test.WriteConfig(cfg, dir); err != nil {
|
if err := test.WriteConfig(cfg, dir); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -177,6 +179,10 @@ func prepareKafka() {
|
||||||
if err := exe.CreateTopic(inputTopic); err != nil {
|
if err := exe.CreateTopic(inputTopic); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
exe.DeleteTopic(clientTopic)
|
||||||
|
if err := exe.CreateTopic(clientTopic); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
|
func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
|
||||||
|
|
|
@ -98,6 +98,8 @@ type Dendrite struct {
|
||||||
Topics struct {
|
Topics struct {
|
||||||
// Topic for roomserver/api.OutputRoomEvent events.
|
// Topic for roomserver/api.OutputRoomEvent events.
|
||||||
OutputRoomEvent Topic `yaml:"output_room_event"`
|
OutputRoomEvent Topic `yaml:"output_room_event"`
|
||||||
|
// Topic for sending account data from client API to sync API
|
||||||
|
OutputClientData Topic `yaml:"output_client_data"`
|
||||||
// Topic for user updates (profile, presence)
|
// Topic for user updates (profile, presence)
|
||||||
UserUpdates Topic `yaml:"user_updates"`
|
UserUpdates Topic `yaml:"user_updates"`
|
||||||
}
|
}
|
||||||
|
@ -298,6 +300,7 @@ func (config *Dendrite) check() error {
|
||||||
|
|
||||||
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
|
checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses)))
|
||||||
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
||||||
|
checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData))
|
||||||
checkNotEmpty("database.account", string(config.Database.Account))
|
checkNotEmpty("database.account", string(config.Database.Account))
|
||||||
checkNotEmpty("database.device", string(config.Database.Device))
|
checkNotEmpty("database.device", string(config.Database.Device))
|
||||||
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
|
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
|
||||||
|
|
|
@ -44,6 +44,7 @@ kafka:
|
||||||
topics:
|
topics:
|
||||||
input_room_event: input.room
|
input_room_event: input.room
|
||||||
output_room_event: output.room
|
output_room_event: output.room
|
||||||
|
output_client_data: output.client
|
||||||
database:
|
database:
|
||||||
media_api: "postgresql:///media_api"
|
media_api: "postgresql:///media_api"
|
||||||
account: "postgresql:///account"
|
account: "postgresql:///account"
|
||||||
|
|
|
@ -82,6 +82,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
|
||||||
// TODO: Different servers should be using different topics.
|
// TODO: Different servers should be using different topics.
|
||||||
// Make this configurable somehow?
|
// Make this configurable somehow?
|
||||||
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
cfg.Kafka.Topics.OutputRoomEvent = "test.room.output"
|
||||||
|
cfg.Kafka.Topics.OutputClientData = "test.clientapi.output"
|
||||||
|
|
||||||
// TODO: Use different databases for the different schemas.
|
// TODO: Use different databases for the different schemas.
|
||||||
// Using the same database for every schema currently works because
|
// Using the same database for every schema currently works because
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package common
|
||||||
|
|
||||||
|
// AccountData represents account data sent from the client API server to the
|
||||||
|
// sync API server
|
||||||
|
type AccountData struct {
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
}
|
|
@ -0,0 +1,91 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package consumers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OutputClientData consumes events that originated in the client API server.
|
||||||
|
type OutputClientData struct {
|
||||||
|
clientAPIConsumer *common.ContinualConsumer
|
||||||
|
db *storage.SyncServerDatabase
|
||||||
|
notifier *sync.Notifier
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
||||||
|
func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) {
|
||||||
|
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer := common.ContinualConsumer{
|
||||||
|
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||||
|
Consumer: kafkaConsumer,
|
||||||
|
PartitionStore: store,
|
||||||
|
}
|
||||||
|
s := &OutputClientData{
|
||||||
|
clientAPIConsumer: &consumer,
|
||||||
|
db: store,
|
||||||
|
notifier: n,
|
||||||
|
}
|
||||||
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start consuming from room servers
|
||||||
|
func (s *OutputClientData) Start() error {
|
||||||
|
return s.clientAPIConsumer.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
// onMessage is called when the sync server receives a new event from the client API server output log.
|
||||||
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
|
// sync stream position may race and be incorrectly calculated.
|
||||||
|
func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
|
// Parse out the event JSON
|
||||||
|
var output common.AccountData
|
||||||
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
// If the message was invalid, log it and move on to the next message in the stream
|
||||||
|
log.WithError(err).Errorf("client API server output log: message parse failure")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"type": output.Type,
|
||||||
|
"room_id": output.RoomID,
|
||||||
|
}).Info("received data from client API server")
|
||||||
|
|
||||||
|
syncStreamPos, err := s.db.UpsertAccountData(string(msg.Key), output.RoomID, output.Type)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"type": output.Type,
|
||||||
|
"room_id": output.RoomID,
|
||||||
|
log.ErrorKey: err,
|
||||||
|
}).Panicf("could not save account data")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.notifier.OnNewEvent(nil, string(msg.Key), syncStreamPos)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -35,12 +35,9 @@ type OutputRoomEvent struct {
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
query api.RoomserverQueryAPI
|
query api.RoomserverQueryAPI
|
||||||
serverName gomatrixserverlib.ServerName
|
|
||||||
keyID gomatrixserverlib.KeyID
|
|
||||||
privateKey []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type prevMembership struct {
|
type prevEventRef struct {
|
||||||
PrevContent json.RawMessage `json:"prev_content"`
|
PrevContent json.RawMessage `json:"prev_content"`
|
||||||
PrevID string `json:"replaces_state"`
|
PrevID string `json:"replaces_state"`
|
||||||
UserID string `json:"prev_sender"`
|
UserID string `json:"prev_sender"`
|
||||||
|
@ -64,9 +61,6 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
||||||
serverName: cfg.Matrix.ServerName,
|
|
||||||
keyID: cfg.Matrix.KeyID,
|
|
||||||
privateKey: cfg.Matrix.PrivateKey,
|
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
@ -113,13 +107,13 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}).Panicf("roomserver output log: state event lookup failure")
|
}).Panicf("roomserver output log: state event lookup failure")
|
||||||
}
|
}
|
||||||
|
|
||||||
ev, err = s.updateStateEvent(ev, s.keyID, s.privateKey)
|
ev, err = s.updateStateEvent(ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range addsStateEvents {
|
for i := range addsStateEvents {
|
||||||
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i], s.keyID, s.privateKey)
|
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -139,7 +133,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}).Panicf("roomserver output log: write event failure")
|
}).Panicf("roomserver output log: write event failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.notifier.OnNewEvent(&ev, types.StreamPosition(syncStreamPos))
|
s.notifier.OnNewEvent(&ev, "", types.StreamPosition(syncStreamPos))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -201,10 +195,7 @@ func (s *OutputRoomEvent) lookupStateEvents(
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEvent) updateStateEvent(
|
func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
|
||||||
event gomatrixserverlib.Event, keyID gomatrixserverlib.KeyID,
|
|
||||||
privateKey []byte,
|
|
||||||
) (gomatrixserverlib.Event, error) {
|
|
||||||
var stateKey string
|
var stateKey string
|
||||||
if event.StateKey() == nil {
|
if event.StateKey() == nil {
|
||||||
stateKey = ""
|
stateKey = ""
|
||||||
|
@ -221,7 +212,7 @@ func (s *OutputRoomEvent) updateStateEvent(
|
||||||
return event, nil
|
return event, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
prev := prevMembership{
|
prev := prevEventRef{
|
||||||
PrevContent: prevEvent.Content(),
|
PrevContent: prevEvent.Content(),
|
||||||
PrevID: prevEvent.EventID(),
|
PrevID: prevEvent.EventID(),
|
||||||
UserID: prevEvent.Sender(),
|
UserID: prevEvent.Sender(),
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
// Copyright 2017 Vector Creations Ltd
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const accountDataSchema = `
|
||||||
|
-- Stores the users account data
|
||||||
|
CREATE TABLE IF NOT EXISTS account_data_type (
|
||||||
|
-- The highest numeric ID from the output_room_events at the time of saving the data
|
||||||
|
id BIGINT,
|
||||||
|
-- ID of the user the data belongs to
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
-- ID of the room the data is related to (empty string if not related to a specific room)
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
-- Type of the data
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
|
||||||
|
PRIMARY KEY(user_id, room_id, type),
|
||||||
|
|
||||||
|
-- We don't want two entries of the same type for the same user
|
||||||
|
CONSTRAINT account_data_unique UNIQUE (user_id, room_id, type)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS account_data_id_idx ON account_data_type(id);
|
||||||
|
`
|
||||||
|
|
||||||
|
const insertAccountDataSQL = "" +
|
||||||
|
"INSERT INTO account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" +
|
||||||
|
" ON CONFLICT ON CONSTRAINT account_data_unique" +
|
||||||
|
" DO UPDATE SET id = EXCLUDED.id"
|
||||||
|
|
||||||
|
const selectAccountDataInRangeSQL = "" +
|
||||||
|
"SELECT room_id, type FROM account_data_type" +
|
||||||
|
" WHERE user_id = $1 AND id > $2 AND id <= $3" +
|
||||||
|
" ORDER BY id ASC"
|
||||||
|
|
||||||
|
type accountDataStatements struct {
|
||||||
|
insertAccountDataStmt *sql.Stmt
|
||||||
|
selectAccountDataInRangeStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *accountDataStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
_, err = db.Exec(accountDataSchema)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *accountDataStatements) insertAccountData(
|
||||||
|
pos types.StreamPosition, userID string, roomID string, dataType string,
|
||||||
|
) (err error) {
|
||||||
|
_, err = s.insertAccountDataStmt.Exec(pos, userID, roomID, dataType)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *accountDataStatements) selectAccountDataInRange(
|
||||||
|
userID string, oldPos types.StreamPosition, newPos types.StreamPosition,
|
||||||
|
) (data map[string][]string, err error) {
|
||||||
|
data = make(map[string][]string)
|
||||||
|
|
||||||
|
// If both positions are the same, it means that the data was saved after the
|
||||||
|
// latest room event. In that case, we need to decrement the old position as
|
||||||
|
// it would prevent the SQL request from returning anything.
|
||||||
|
if oldPos == newPos {
|
||||||
|
oldPos--
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := s.selectAccountDataInRangeStmt.Query(userID, oldPos, newPos)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var dataType string
|
||||||
|
var roomID string
|
||||||
|
|
||||||
|
if err = rows.Scan(&roomID, &dataType); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(data[roomID]) > 0 {
|
||||||
|
data[roomID] = append(data[roomID], dataType)
|
||||||
|
} else {
|
||||||
|
data[roomID] = []string{dataType}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
|
@ -43,6 +43,7 @@ type streamEvent struct {
|
||||||
type SyncServerDatabase struct {
|
type SyncServerDatabase struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
partitions common.PartitionOffsetStatements
|
partitions common.PartitionOffsetStatements
|
||||||
|
accountData accountDataStatements
|
||||||
events outputRoomEventsStatements
|
events outputRoomEventsStatements
|
||||||
roomstate currentRoomStateStatements
|
roomstate currentRoomStateStatements
|
||||||
}
|
}
|
||||||
|
@ -58,6 +59,10 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
||||||
if err = partitions.Prepare(db); err != nil {
|
if err = partitions.Prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
accountData := accountDataStatements{}
|
||||||
|
if err = accountData.prepare(db); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
events := outputRoomEventsStatements{}
|
events := outputRoomEventsStatements{}
|
||||||
if err = events.prepare(db); err != nil {
|
if err = events.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -66,7 +71,7 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
||||||
if err := state.prepare(db); err != nil {
|
if err := state.prepare(db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &SyncServerDatabase{db, partitions, events, state}, nil
|
return &SyncServerDatabase{db, partitions, accountData, events, state}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
|
||||||
|
@ -274,6 +279,33 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAccountDataInRange returns all account data for a given user inserted or
|
||||||
|
// updated between two given positions
|
||||||
|
// Returns a map following the format data[roomID] = []dataTypes
|
||||||
|
// If no data is retrieved, returns an empty map
|
||||||
|
// If there was an issue with the retrieval, returns an error
|
||||||
|
func (d *SyncServerDatabase) GetAccountDataInRange(
|
||||||
|
userID string, oldPos types.StreamPosition, newPos types.StreamPosition,
|
||||||
|
) (map[string][]string, error) {
|
||||||
|
return d.accountData.selectAccountDataInRange(userID, oldPos, newPos)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpsertAccountData keeps track of new or updated account data, by saving the type
|
||||||
|
// of the new/updated data, and the user ID and room ID the data is related to (empty)
|
||||||
|
// room ID means the data isn't specific to any room)
|
||||||
|
// If no data with the given type, user ID and room ID exists in the database,
|
||||||
|
// creates a new row, else update the existing one
|
||||||
|
// Returns an error if there was an issue with the upsert
|
||||||
|
func (d *SyncServerDatabase) UpsertAccountData(userID string, roomID string, dataType string) (types.StreamPosition, error) {
|
||||||
|
pos, err := d.SyncStreamPosition()
|
||||||
|
if err != nil {
|
||||||
|
return pos, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = d.accountData.insertAccountData(pos, userID, roomID, dataType)
|
||||||
|
return pos, err
|
||||||
|
}
|
||||||
|
|
||||||
func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error {
|
func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error {
|
||||||
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
|
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
|
||||||
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(txn, userID, "invite")
|
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(txn, userID, "invite")
|
||||||
|
|
|
@ -54,13 +54,15 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
|
||||||
// OnNewEvent is called when a new event is received from the room server. Must only be
|
// OnNewEvent is called when a new event is received from the room server. Must only be
|
||||||
// called from a single goroutine, to avoid races between updates which could set the
|
// called from a single goroutine, to avoid races between updates which could set the
|
||||||
// current position in the stream incorrectly.
|
// current position in the stream incorrectly.
|
||||||
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) {
|
// Can be called either with a *gomatrixserverlib.Event, or with an user ID
|
||||||
|
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) {
|
||||||
// update the current position then notify relevant /sync streams.
|
// update the current position then notify relevant /sync streams.
|
||||||
// This needs to be done PRIOR to waking up users as they will read this value.
|
// This needs to be done PRIOR to waking up users as they will read this value.
|
||||||
n.streamLock.Lock()
|
n.streamLock.Lock()
|
||||||
defer n.streamLock.Unlock()
|
defer n.streamLock.Unlock()
|
||||||
n.currPos = pos
|
n.currPos = pos
|
||||||
|
|
||||||
|
if ev != nil {
|
||||||
// Map this event's room_id to a list of joined users, and wake them up.
|
// Map this event's room_id to a list of joined users, and wake them up.
|
||||||
userIDs := n.joinedUsers(ev.RoomID())
|
userIDs := n.joinedUsers(ev.RoomID())
|
||||||
// If this is an invite, also add in the invitee to this list.
|
// If this is an invite, also add in the invitee to this list.
|
||||||
|
@ -89,6 +91,9 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosit
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
n.wakeupUser(userID, pos)
|
n.wakeupUser(userID, pos)
|
||||||
}
|
}
|
||||||
|
} else if len(userID) > 0 {
|
||||||
|
n.wakeupUser(userID, pos)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForEvents blocks until there are new events for this request.
|
// WaitForEvents blocks until there are new events for this request.
|
||||||
|
|
|
@ -123,7 +123,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(stream, 1)
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
|
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
@ -151,7 +151,7 @@ func TestNewInviteEventForUser(t *testing.T) {
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(stream, 1)
|
waitForBlocking(stream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&aliceInviteBobEvent, streamPositionAfter)
|
n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
||||||
stream := n.fetchUserStream(bob, true)
|
stream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(stream, 3)
|
waitForBlocking(stream, 3)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter)
|
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
@ -217,7 +217,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
bobStream := n.fetchUserStream(bob, true)
|
bobStream := n.fetchUserStream(bob, true)
|
||||||
waitForBlocking(bobStream, 1)
|
waitForBlocking(bobStream, 1)
|
||||||
n.OnNewEvent(&bobLeaveEvent, streamPositionAfter)
|
n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
|
||||||
leaveWG.Wait()
|
leaveWG.Wait()
|
||||||
|
|
||||||
// send an event into the room. Make sure alice gets it. Bob should not.
|
// send an event into the room. Make sure alice gets it. Bob should not.
|
||||||
|
@ -246,7 +246,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
||||||
waitForBlocking(aliceStream, 1)
|
waitForBlocking(aliceStream, 1)
|
||||||
waitForBlocking(bobStream, 1)
|
waitForBlocking(bobStream, 1)
|
||||||
|
|
||||||
n.OnNewEvent(&randomMessageEvent, streamPositionAfter2)
|
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
|
||||||
aliceWG.Wait()
|
aliceWG.Wait()
|
||||||
|
|
||||||
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
|
||||||
|
|
|
@ -80,7 +80,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res = httputil.LogThenError(req, err)
|
res = httputil.LogThenError(req, err)
|
||||||
} else {
|
} else {
|
||||||
syncData, err = rp.appendAccountData(syncData, device.UserID)
|
syncData, err = rp.appendAccountData(syncData, device.UserID, *syncReq, currentPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res = httputil.LogThenError(req, err)
|
res = httputil.LogThenError(req, err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -113,7 +113,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre
|
||||||
return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) appendAccountData(data *types.Response, userID string) (*types.Response, error) {
|
func (rp *RequestPool) appendAccountData(
|
||||||
|
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
||||||
|
) (*types.Response, error) {
|
||||||
// TODO: We currently send all account data on every sync response, we should instead send data
|
// TODO: We currently send all account data on every sync response, we should instead send data
|
||||||
// that has changed on incremental sync responses
|
// that has changed on incremental sync responses
|
||||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
@ -121,7 +123,12 @@ func (rp *RequestPool) appendAccountData(data *types.Response, userID string) (*
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
global, rooms, err := rp.accountDB.GetAccountData(localpart)
|
if req.since == types.StreamPosition(0) {
|
||||||
|
// If this is the initial sync, we don't need to check if a data has
|
||||||
|
// already been sent. Instead, we send the whole batch.
|
||||||
|
var global []gomatrixserverlib.ClientEvent
|
||||||
|
var rooms map[string][]gomatrixserverlib.ClientEvent
|
||||||
|
global, rooms, err = rp.accountDB.GetAccountData(localpart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -134,5 +141,40 @@ func (rp *RequestPool) appendAccountData(data *types.Response, userID string) (*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync is not initial, get all account data since the latest sync
|
||||||
|
dataTypes, err := rp.db.GetAccountDataInRange(userID, req.since, currentPos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(dataTypes) == 0 {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate over the rooms
|
||||||
|
for roomID, dataTypes := range dataTypes {
|
||||||
|
events := []gomatrixserverlib.ClientEvent{}
|
||||||
|
// Request the missing data from the database
|
||||||
|
for _, dataType := range dataTypes {
|
||||||
|
evs, err := rp.accountDB.GetAccountDataByType(localpart, roomID, dataType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
events = append(events, evs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append the data to the response
|
||||||
|
if len(roomID) > 0 {
|
||||||
|
jr := data.Rooms.Join[roomID]
|
||||||
|
jr.AccountData.Events = events
|
||||||
|
data.Rooms.Join[roomID] = jr
|
||||||
|
} else {
|
||||||
|
data.AccountData.Events = events
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue