Refactor how federationsender gets created (#1095)

* Refactor how federationsender gets created

* s/httpint/inthttp/ for alphabetical niceness with internal package
main
Kegsay 2020-06-04 14:27:10 +01:00 committed by GitHub
parent f7025d3499
commit f4c676ccdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 288 additions and 303 deletions

View File

@ -37,7 +37,7 @@ func main() {
asQuery := base.CreateHTTPAppServiceAPIs() asQuery := base.CreateHTTPAppServiceAPIs()
rsAPI := base.CreateHTTPRoomserverAPIs() rsAPI := base.CreateHTTPRoomserverAPIs()
fsAPI := base.CreateHTTPFederationSenderAPIs() fsAPI := base.FederationSenderHTTPClient()
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB)

View File

@ -178,7 +178,7 @@ func main() {
base.Base.PublicAPIMux, base.Base.PublicAPIMux,
base.Base.InternalAPIMux, base.Base.InternalAPIMux,
&cfg, &cfg,
base.Base.EnableHTTPAPIs, base.Base.UseHTTPAPIs,
) )
// Expose the matrix APIs directly rather than putting them under a /api path. // Expose the matrix APIs directly rather than putting them under a /api path.

View File

@ -34,7 +34,7 @@ func main() {
serverKeyAPI := base.CreateHTTPServerKeyAPIs() serverKeyAPI := base.CreateHTTPServerKeyAPIs()
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
fsAPI := base.CreateHTTPFederationSenderAPIs() fsAPI := base.FederationSenderHTTPClient()
rsAPI := base.CreateHTTPRoomserverAPIs() rsAPI := base.CreateHTTPRoomserverAPIs()
asAPI := base.CreateHTTPAppServiceAPIs() asAPI := base.CreateHTTPAppServiceAPIs()

View File

@ -73,7 +73,7 @@ func main() {
serverKeyAPI := serverkeyapi.SetupServerKeyAPIComponent( serverKeyAPI := serverkeyapi.SetupServerKeyAPIComponent(
base, federation, base, federation,
) )
if base.EnableHTTPAPIs { if base.UseHTTPAPIs {
serverKeyAPI = base.CreateHTTPServerKeyAPIs() serverKeyAPI = base.CreateHTTPServerKeyAPIs()
} }
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
@ -82,29 +82,29 @@ func main() {
base, keyRing, federation, base, keyRing, federation,
) )
rsAPI := rsComponent rsAPI := rsComponent
if base.EnableHTTPAPIs { if base.UseHTTPAPIs {
rsAPI = base.CreateHTTPRoomserverAPIs() rsAPI = base.CreateHTTPRoomserverAPIs()
} }
eduInputAPI := eduserver.SetupEDUServerComponent( eduInputAPI := eduserver.SetupEDUServerComponent(
base, cache.New(), deviceDB, base, cache.New(), deviceDB,
) )
if base.EnableHTTPAPIs { if base.UseHTTPAPIs {
eduInputAPI = base.CreateHTTPEDUServerAPIs() eduInputAPI = base.CreateHTTPEDUServerAPIs()
} }
asAPI := appservice.SetupAppServiceAPIComponent( asAPI := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, rsAPI, transactions.New(), base, accountDB, deviceDB, federation, rsAPI, transactions.New(),
) )
if base.EnableHTTPAPIs { if base.UseHTTPAPIs {
asAPI = base.CreateHTTPAppServiceAPIs() asAPI = base.CreateHTTPAppServiceAPIs()
} }
fsAPI := federationsender.SetupFederationSenderComponent( fsAPI := federationsender.SetupFederationSenderComponent(
base, federation, rsAPI, keyRing, base, federation, rsAPI, keyRing,
) )
if base.EnableHTTPAPIs { if base.UseHTTPAPIs {
fsAPI = base.CreateHTTPFederationSenderAPIs() fsAPI = base.FederationSenderHTTPClient()
} }
rsComponent.SetFederationSenderAPI(fsAPI) rsComponent.SetFederationSenderAPI(fsAPI)
@ -132,7 +132,7 @@ func main() {
base.PublicAPIMux, base.PublicAPIMux,
base.InternalAPIMux, base.InternalAPIMux,
cfg, cfg,
base.EnableHTTPAPIs, base.UseHTTPAPIs,
) )
// Expose the matrix APIs directly rather than putting them under a /api path. // Expose the matrix APIs directly rather than putting them under a /api path.

View File

@ -28,7 +28,7 @@ func main() {
deviceDB := base.CreateDeviceDB() deviceDB := base.CreateDeviceDB()
fsAPI := base.CreateHTTPFederationSenderAPIs() fsAPI := base.FederationSenderHTTPClient()
rsAPI := base.CreateHTTPRoomserverAPIs() rsAPI := base.CreateHTTPRoomserverAPIs()
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)

View File

@ -28,7 +28,7 @@ func main() {
serverKeyAPI := base.CreateHTTPServerKeyAPIs() serverKeyAPI := base.CreateHTTPServerKeyAPIs()
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
fsAPI := base.CreateHTTPFederationSenderAPIs() fsAPI := base.FederationSenderHTTPClient()
rsAPI := roomserver.SetupRoomServerComponent(base, keyRing, federation) rsAPI := roomserver.SetupRoomServerComponent(base, keyRing, federation)
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)

View File

@ -235,7 +235,7 @@ func main() {
base.PublicAPIMux, base.PublicAPIMux,
base.InternalAPIMux, base.InternalAPIMux,
cfg, cfg,
base.EnableHTTPAPIs, base.UseHTTPAPIs,
) )
// Expose the matrix APIs via libp2p-js - for federation traffic // Expose the matrix APIs via libp2p-js - for federation traffic

View File

@ -2,8 +2,9 @@ package api
import ( import (
"context" "context"
"errors"
"net/http" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib"
) )
// FederationSenderInternalAPI is used to query information from the federation sender. // FederationSenderInternalAPI is used to query information from the federation sender.
@ -50,16 +51,59 @@ type FederationSenderInternalAPI interface {
) error ) error
} }
// NewFederationSenderInternalAPIHTTP creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. type PerformDirectoryLookupRequest struct {
// If httpClient is nil an error is returned RoomAlias string `json:"room_alias"`
func NewFederationSenderInternalAPIHTTP(federationSenderURL string, httpClient *http.Client) (FederationSenderInternalAPI, error) { ServerName gomatrixserverlib.ServerName `json:"server_name"`
if httpClient == nil {
return nil, errors.New("NewFederationSenderInternalAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderInternalAPI{federationSenderURL, httpClient}, nil
} }
type httpFederationSenderInternalAPI struct { type PerformDirectoryLookupResponse struct {
federationSenderURL string RoomID string `json:"room_id"`
httpClient *http.Client ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
type PerformJoinRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
Content map[string]interface{} `json:"content"`
}
type PerformJoinResponse struct {
}
type PerformLeaveRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
ServerNames types.ServerNames `json:"server_names"`
}
type PerformLeaveResponse struct {
}
type PerformServersAliveRequest struct {
Servers []gomatrixserverlib.ServerName
}
type PerformServersAliveResponse struct {
}
// QueryJoinedHostsInRoomRequest is a request to QueryJoinedHostsInRoom
type QueryJoinedHostsInRoomRequest struct {
RoomID string `json:"room_id"`
}
// QueryJoinedHostsInRoomResponse is a response to QueryJoinedHostsInRoom
type QueryJoinedHostsInRoomResponse struct {
JoinedHosts []types.JoinedHost `json:"joined_hosts"`
}
// QueryJoinedHostServerNamesRequest is a request to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomRequest struct {
RoomID string `json:"room_id"`
}
// QueryJoinedHostServerNamesResponse is a response to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomResponse struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
} }

View File

@ -1,112 +0,0 @@
package api
import (
"context"
"github.com/matrix-org/dendrite/federationsender/types"
internalHTTP "github.com/matrix-org/dendrite/internal/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
const (
// FederationSenderPerformJoinRequestPath is the HTTP path for the PerformJoinRequest API.
FederationSenderPerformDirectoryLookupRequestPath = "/federationsender/performDirectoryLookup"
// FederationSenderPerformJoinRequestPath is the HTTP path for the PerformJoinRequest API.
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
// FederationSenderPerformLeaveRequestPath is the HTTP path for the PerformLeaveRequest API.
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
// FederationSenderPerformServersAlivePath is the HTTP path for the PerformServersAlive API.
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
)
type PerformDirectoryLookupRequest struct {
RoomAlias string `json:"room_alias"`
ServerName gomatrixserverlib.ServerName `json:"server_name"`
}
type PerformDirectoryLookupResponse struct {
RoomID string `json:"room_id"`
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
// Handle an instruction to make_join & send_join with a remote server.
func (h *httpFederationSenderInternalAPI) PerformDirectoryLookup(
ctx context.Context,
request *PerformDirectoryLookupRequest,
response *PerformDirectoryLookupResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDirectoryLookup")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformDirectoryLookupRequestPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
type PerformJoinRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
Content map[string]interface{} `json:"content"`
}
type PerformJoinResponse struct {
}
// Handle an instruction to make_join & send_join with a remote server.
func (h *httpFederationSenderInternalAPI) PerformJoin(
ctx context.Context,
request *PerformJoinRequest,
response *PerformJoinResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformJoinRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformJoinRequestPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
type PerformLeaveRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
ServerNames types.ServerNames `json:"server_names"`
}
type PerformLeaveResponse struct {
}
// Handle an instruction to make_leave & send_leave with a remote server.
func (h *httpFederationSenderInternalAPI) PerformLeave(
ctx context.Context,
request *PerformLeaveRequest,
response *PerformLeaveResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformLeaveRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformLeaveRequestPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
type PerformServersAliveRequest struct {
Servers []gomatrixserverlib.ServerName
}
type PerformServersAliveResponse struct {
}
func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,
request *PerformServersAliveRequest,
response *PerformServersAliveResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformServersAlive")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformServersAlivePath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View File

@ -1,63 +0,0 @@
package api
import (
"context"
"github.com/matrix-org/dendrite/federationsender/types"
internalHTTP "github.com/matrix-org/dendrite/internal/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
// FederationSenderQueryJoinedHostsInRoomPath is the HTTP path for the QueryJoinedHostsInRoom API.
const FederationSenderQueryJoinedHostsInRoomPath = "/federationsender/queryJoinedHostsInRoom"
// FederationSenderQueryJoinedHostServerNamesInRoomPath is the HTTP path for the QueryJoinedHostServerNamesInRoom API.
const FederationSenderQueryJoinedHostServerNamesInRoomPath = "/federationsender/queryJoinedHostServerNamesInRoom"
// QueryJoinedHostsInRoomRequest is a request to QueryJoinedHostsInRoom
type QueryJoinedHostsInRoomRequest struct {
RoomID string `json:"room_id"`
}
// QueryJoinedHostsInRoomResponse is a response to QueryJoinedHostsInRoom
type QueryJoinedHostsInRoomResponse struct {
JoinedHosts []types.JoinedHost `json:"joined_hosts"`
}
// QueryJoinedHostsInRoom implements FederationSenderInternalAPI
func (h *httpFederationSenderInternalAPI) QueryJoinedHostsInRoom(
ctx context.Context,
request *QueryJoinedHostsInRoomRequest,
response *QueryJoinedHostsInRoomResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryJoinedHostsInRoom")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderQueryJoinedHostsInRoomPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryJoinedHostServerNamesRequest is a request to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomRequest struct {
RoomID string `json:"room_id"`
}
// QueryJoinedHostServerNamesResponse is a response to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomResponse struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
// QueryJoinedHostServerNamesInRoom implements FederationSenderInternalAPI
func (h *httpFederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *QueryJoinedHostServerNamesInRoomRequest,
response *QueryJoinedHostServerNamesInRoomResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryJoinedHostServerNamesInRoom")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderQueryJoinedHostServerNamesInRoomPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View File

@ -18,6 +18,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/internal" "github.com/matrix-org/dendrite/federationsender/internal"
"github.com/matrix-org/dendrite/federationsender/inthttp"
"github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
@ -65,12 +66,8 @@ func SetupFederationSenderComponent(
logrus.WithError(err).Panic("failed to start typing server consumer") logrus.WithError(err).Panic("failed to start typing server consumer")
} }
queryAPI := internal.NewFederationSenderInternalAPI( queryAPI := internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, statistics, queues)
federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, inthttp.AddRoutes(queryAPI, base.InternalAPIMux)
statistics, queues,
)
queryAPI.SetupHTTP(base.InternalAPIMux)
return queryAPI return queryAPI
} }

View File

@ -1,19 +1,13 @@
package internal package internal
import ( import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI // FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
@ -46,87 +40,3 @@ func NewFederationSenderInternalAPI(
queues: queues, queues: queues,
} }
} }
// SetupHTTP adds the FederationSenderInternalAPI handlers to the http.ServeMux.
func (f *FederationSenderInternalAPI) SetupHTTP(internalAPIMux *mux.Router) {
internalAPIMux.Handle(
api.FederationSenderQueryJoinedHostsInRoomPath,
internal.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostsInRoomRequest
var response api.QueryJoinedHostsInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
api.FederationSenderQueryJoinedHostServerNamesInRoomPath,
internal.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostServerNamesInRoomRequest
var response api.QueryJoinedHostServerNamesInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(api.FederationSenderPerformJoinRequestPath,
internal.MakeInternalAPI("PerformJoinRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformJoinRequest
var response api.PerformJoinResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformJoin(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(api.FederationSenderPerformLeaveRequestPath,
internal.MakeInternalAPI("PerformLeaveRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformLeaveRequest
var response api.PerformLeaveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformLeave(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(api.FederationSenderPerformDirectoryLookupRequestPath,
internal.MakeInternalAPI("PerformDirectoryLookupRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformDirectoryLookupRequest
var response api.PerformDirectoryLookupResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformDirectoryLookup(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(api.FederationSenderPerformServersAlivePath,
internal.MakeInternalAPI("PerformServersAliveRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformServersAliveRequest
var response api.PerformServersAliveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformServersAlive(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View File

@ -0,0 +1,113 @@
package inthttp
import (
"context"
"errors"
"net/http"
"github.com/matrix-org/dendrite/federationsender/api"
internalHTTP "github.com/matrix-org/dendrite/internal/http"
"github.com/opentracing/opentracing-go"
)
// HTTP paths for the internal HTTP API
const (
FederationSenderQueryJoinedHostsInRoomPath = "/federationsender/queryJoinedHostsInRoom"
FederationSenderQueryJoinedHostServerNamesInRoomPath = "/federationsender/queryJoinedHostServerNamesInRoom"
FederationSenderPerformDirectoryLookupRequestPath = "/federationsender/performDirectoryLookup"
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
)
// NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewFederationSenderClient(federationSenderURL string, httpClient *http.Client) (api.FederationSenderInternalAPI, error) {
if httpClient == nil {
return nil, errors.New("NewFederationSenderInternalAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderInternalAPI{federationSenderURL, httpClient}, nil
}
type httpFederationSenderInternalAPI struct {
federationSenderURL string
httpClient *http.Client
}
// Handle an instruction to make_leave & send_leave with a remote server.
func (h *httpFederationSenderInternalAPI) PerformLeave(
ctx context.Context,
request *api.PerformLeaveRequest,
response *api.PerformLeaveResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformLeaveRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformLeaveRequestPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,
request *api.PerformServersAliveRequest,
response *api.PerformServersAliveResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformServersAlive")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformServersAlivePath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryJoinedHostServerNamesInRoom implements FederationSenderInternalAPI
func (h *httpFederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *api.QueryJoinedHostServerNamesInRoomRequest,
response *api.QueryJoinedHostServerNamesInRoomResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryJoinedHostServerNamesInRoom")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderQueryJoinedHostServerNamesInRoomPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryJoinedHostsInRoom implements FederationSenderInternalAPI
func (h *httpFederationSenderInternalAPI) QueryJoinedHostsInRoom(
ctx context.Context,
request *api.QueryJoinedHostsInRoomRequest,
response *api.QueryJoinedHostsInRoomResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryJoinedHostsInRoom")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderQueryJoinedHostsInRoomPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// Handle an instruction to make_join & send_join with a remote server.
func (h *httpFederationSenderInternalAPI) PerformJoin(
ctx context.Context,
request *api.PerformJoinRequest,
response *api.PerformJoinResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformJoinRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformJoinRequestPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// Handle an instruction to make_join & send_join with a remote server.
func (h *httpFederationSenderInternalAPI) PerformDirectoryLookup(
ctx context.Context,
request *api.PerformDirectoryLookupRequest,
response *api.PerformDirectoryLookupResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDirectoryLookup")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformDirectoryLookupRequestPath
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View File

@ -0,0 +1,95 @@
package inthttp
import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/util"
)
// AddRoutes adds the FederationSenderInternalAPI handlers to the http.ServeMux.
func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Router) {
internalAPIMux.Handle(
FederationSenderQueryJoinedHostsInRoomPath,
internal.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostsInRoomRequest
var response api.QueryJoinedHostsInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := intAPI.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
FederationSenderQueryJoinedHostServerNamesInRoomPath,
internal.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostServerNamesInRoomRequest
var response api.QueryJoinedHostServerNamesInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := intAPI.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(FederationSenderPerformJoinRequestPath,
internal.MakeInternalAPI("PerformJoinRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformJoinRequest
var response api.PerformJoinResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := intAPI.PerformJoin(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(FederationSenderPerformLeaveRequestPath,
internal.MakeInternalAPI("PerformLeaveRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformLeaveRequest
var response api.PerformLeaveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := intAPI.PerformLeave(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(FederationSenderPerformDirectoryLookupRequestPath,
internal.MakeInternalAPI("PerformDirectoryLookupRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformDirectoryLookupRequest
var response api.PerformDirectoryLookupResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := intAPI.PerformDirectoryLookup(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(FederationSenderPerformServersAlivePath,
internal.MakeInternalAPI("PerformServersAliveRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformServersAliveRequest
var response api.PerformServersAliveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := intAPI.PerformServersAlive(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View File

@ -38,6 +38,7 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/inthttp"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api" serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api"
@ -58,7 +59,7 @@ type BaseDendrite struct {
// PublicAPIMux should be used to register new public matrix api endpoints // PublicAPIMux should be used to register new public matrix api endpoints
PublicAPIMux *mux.Router PublicAPIMux *mux.Router
InternalAPIMux *mux.Router InternalAPIMux *mux.Router
EnableHTTPAPIs bool UseHTTPAPIs bool
httpClient *http.Client httpClient *http.Client
Cfg *config.Dendrite Cfg *config.Dendrite
ImmutableCache caching.ImmutableCache ImmutableCache caching.ImmutableCache
@ -72,7 +73,7 @@ const HTTPClientTimeout = time.Second * 30
// NewBaseDendrite creates a new instance to be used by a component. // NewBaseDendrite creates a new instance to be used by a component.
// The componentName is used for logging purposes, and should be a friendly name // The componentName is used for logging purposes, and should be a friendly name
// of the compontent running, e.g. "SyncAPI" // of the compontent running, e.g. "SyncAPI"
func NewBaseDendrite(cfg *config.Dendrite, componentName string, enableHTTPAPIs bool) *BaseDendrite { func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs bool) *BaseDendrite {
internal.SetupStdLogging() internal.SetupStdLogging()
internal.SetupHookLogging(cfg.Logging, componentName) internal.SetupHookLogging(cfg.Logging, componentName)
internal.SetupPprof() internal.SetupPprof()
@ -118,7 +119,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, enableHTTPAPIs
return &BaseDendrite{ return &BaseDendrite{
componentName: componentName, componentName: componentName,
EnableHTTPAPIs: enableHTTPAPIs, UseHTTPAPIs: useHTTPAPIs,
tracerCloser: closer, tracerCloser: closer,
Cfg: cfg, Cfg: cfg,
ImmutableCache: cache, ImmutableCache: cache,
@ -165,10 +166,10 @@ func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI
return e return e
} }
// CreateHTTPFederationSenderAPIs returns FederationSenderInternalAPI for hitting // FederationSenderHTTPClient returns FederationSenderInternalAPI for hitting
// the federation sender over HTTP // the federation sender over HTTP
func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderInternalAPI { func (b *BaseDendrite) FederationSenderHTTPClient() federationSenderAPI.FederationSenderInternalAPI {
f, err := federationSenderAPI.NewFederationSenderInternalAPIHTTP(b.Cfg.FederationSenderURL(), b.httpClient) f, err := inthttp.NewFederationSenderClient(b.Cfg.FederationSenderURL(), b.httpClient)
if err != nil { if err != nil {
logrus.WithError(err).Panic("NewFederationSenderInternalAPIHTTP failed", b.httpClient) logrus.WithError(err).Panic("NewFederationSenderInternalAPIHTTP failed", b.httpClient)
} }
@ -241,7 +242,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) {
b.PublicAPIMux, b.PublicAPIMux,
b.InternalAPIMux, b.InternalAPIMux,
b.Cfg, b.Cfg,
b.EnableHTTPAPIs, b.UseHTTPAPIs,
) )
logrus.Infof("Starting %s server on %s", b.componentName, serv.Addr) logrus.Infof("Starting %s server on %s", b.componentName, serv.Addr)