Set up queryAPI in federationsender (#791)
parent
43308d2f3f
commit
5e25f6ba22
|
@ -15,11 +15,15 @@
|
||||||
package federationsender
|
package federationsender
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||||
|
"github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/federationsender/consumers"
|
"github.com/matrix-org/dendrite/federationsender/consumers"
|
||||||
|
"github.com/matrix-org/dendrite/federationsender/query"
|
||||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -29,8 +33,8 @@ import (
|
||||||
func SetupFederationSenderComponent(
|
func SetupFederationSenderComponent(
|
||||||
base *basecomponent.BaseDendrite,
|
base *basecomponent.BaseDendrite,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
rsQueryAPI roomserverAPI.RoomserverQueryAPI,
|
||||||
) {
|
) api.FederationSenderQueryAPI {
|
||||||
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
|
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to connect to federation sender db")
|
logrus.WithError(err).Panic("failed to connect to federation sender db")
|
||||||
|
@ -40,7 +44,7 @@ func SetupFederationSenderComponent(
|
||||||
|
|
||||||
rsConsumer := consumers.NewOutputRoomEventConsumer(
|
rsConsumer := consumers.NewOutputRoomEventConsumer(
|
||||||
base.Cfg, base.KafkaConsumer, queues,
|
base.Cfg, base.KafkaConsumer, queues,
|
||||||
federationSenderDB, queryAPI,
|
federationSenderDB, rsQueryAPI,
|
||||||
)
|
)
|
||||||
if err = rsConsumer.Start(); err != nil {
|
if err = rsConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start room server consumer")
|
logrus.WithError(err).Panic("failed to start room server consumer")
|
||||||
|
@ -52,4 +56,11 @@ func SetupFederationSenderComponent(
|
||||||
if err := tsConsumer.Start(); err != nil {
|
if err := tsConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start typing server consumer")
|
logrus.WithError(err).Panic("failed to start typing server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queryAPI := query.FederationSenderQueryAPI{
|
||||||
|
DB: federationSenderDB,
|
||||||
|
}
|
||||||
|
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||||
|
|
||||||
|
return &queryAPI
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,14 @@ package query
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/federationsender/api"
|
"github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/federationsender/types"
|
"github.com/matrix-org/dendrite/federationsender/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FederationSenderQueryDatabase has the APIs needed to implement the query API.
|
// FederationSenderQueryDatabase has the APIs needed to implement the query API.
|
||||||
|
@ -53,3 +57,35 @@ func (f *FederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom(
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetupHTTP adds the FederationSenderQueryAPI handlers to the http.ServeMux.
|
||||||
|
func (f *FederationSenderQueryAPI) SetupHTTP(servMux *http.ServeMux) {
|
||||||
|
servMux.Handle(
|
||||||
|
api.FederationSenderQueryJoinedHostsInRoomPath,
|
||||||
|
common.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request api.QueryJoinedHostsInRoomRequest
|
||||||
|
var response api.QueryJoinedHostsInRoomResponse
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
if err := f.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
servMux.Handle(
|
||||||
|
api.FederationSenderQueryJoinedHostServerNamesInRoomPath,
|
||||||
|
common.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request api.QueryJoinedHostServerNamesInRoomRequest
|
||||||
|
var response api.QueryJoinedHostServerNamesInRoomResponse
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
if err := f.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue