From 37d117f2b7db9d04c0d6ee0e71baaad8d6680ddd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 24 Jan 2020 17:11:20 +0000 Subject: [PATCH] Wire up publicroomsapi for roomserver events (#851) * Wire up publicroomsapi to roomserver events * Remove parameter that was incorrectly brought over from p2p work * nolint containsBackwardExtremity for now --- cmd/dendrite-monolith-server/main.go | 2 +- cmd/dendrite-public-rooms-api-server/main.go | 4 +++- publicroomsapi/publicroomsapi.go | 10 ++++++++++ publicroomsapi/storage/postgres/storage.go | 11 ++++++----- syncapi/routing/messages.go | 5 +++++ 5 files changed, 25 insertions(+), 7 deletions(-) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 7515ec5c..b3de9add 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -69,7 +69,7 @@ func main() { ) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) mediaapi.SetupMediaAPIComponent(base, deviceDB) - publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) + publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) httpHandler := common.WrapHandlerInCORS(base.APIMux) diff --git a/cmd/dendrite-public-rooms-api-server/main.go b/cmd/dendrite-public-rooms-api-server/main.go index b60eed92..f8bd8b06 100644 --- a/cmd/dendrite-public-rooms-api-server/main.go +++ b/cmd/dendrite-public-rooms-api-server/main.go @@ -26,7 +26,9 @@ func main() { deviceDB := base.CreateDeviceDB() - publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) + _, _, query := base.CreateHTTPRoomserverAPIs() + + publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query) base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI)) diff --git a/publicroomsapi/publicroomsapi.go b/publicroomsapi/publicroomsapi.go index cf9ac00e..181966d3 100644 --- a/publicroomsapi/publicroomsapi.go +++ b/publicroomsapi/publicroomsapi.go @@ -17,8 +17,10 @@ package publicroomsapi import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/publicroomsapi/consumers" "github.com/matrix-org/dendrite/publicroomsapi/routing" "github.com/matrix-org/dendrite/publicroomsapi/storage" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/sirupsen/logrus" ) @@ -27,11 +29,19 @@ import ( func SetupPublicRoomsAPIComponent( base *basecomponent.BaseDendrite, deviceDB *devices.Database, + rsQueryAPI roomserverAPI.RoomserverQueryAPI, ) { publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") } + rsConsumer := consumers.NewOutputRoomEventConsumer( + base.Cfg, base.KafkaConsumer, publicRoomsDB, rsQueryAPI, + ) + if err = rsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start public rooms server consumer") + } + routing.Setup(base.APIMux, deviceDB, publicRoomsDB) } diff --git a/publicroomsapi/storage/postgres/storage.go b/publicroomsapi/storage/postgres/storage.go index 67b5efc3..5365c766 100644 --- a/publicroomsapi/storage/postgres/storage.go +++ b/publicroomsapi/storage/postgres/storage.go @@ -42,15 +42,16 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (*PublicRoomsServerData if db, err = sql.Open("postgres", dataSourceName); err != nil { return nil, err } - partitions := common.PartitionOffsetStatements{} - if err = partitions.Prepare(db, "publicroomsapi"); err != nil { + storage := PublicRoomsServerDatabase{ + db: db, + } + if err = storage.PartitionOffsetStatements.Prepare(db, "publicroomsapi"); err != nil { return nil, err } - statements := publicRoomsStatements{} - if err = statements.prepare(db); err != nil { + if err = storage.statements.prepare(db); err != nil { return nil, err } - return &PublicRoomsServerDatabase{db, partitions, statements}, nil + return &storage, nil } // GetRoomVisibility returns the room visibility as a boolean: true if the room diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index eb678296..4fac2ba2 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -344,6 +344,11 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent // considers the event itself a backward extremity if at least one of the parent // events doesn't exist in the database. // Returns an error if there was an issue with talking to the database. +// +// This function is unused but currently set to nolint for now until we are +// absolutely sure that the changes in matrix-org/dendrite#847 are behaving +// properly. +// nolint:unused func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) { // Select the earliest retrieved event. var ev *types.StreamEvent