Remove clientapi producers which aren't actually producers (#1111)

* Remove clientapi producers which aren't actually producers

They are actually just convenience wrappers around the internal APIs
for roomserver/eduserver. Move their logic to their respective `api`
packages and call them directly.

* Remove TODO

* unbreak ygg
main
Kegsay 2020-06-10 12:17:54 +01:00 committed by GitHub
parent d9d6f4568c
commit b7187a9a35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 160 additions and 226 deletions

View File

@ -48,9 +48,6 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI, fsAPI federationSenderAPI.FederationSenderInternalAPI,
) { ) {
roomserverProducer := producers.NewRoomserverProducer(rsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
Producer: producer, Producer: producer,
Topic: string(cfg.Kafka.Topics.OutputClientData), Topic: string(cfg.Kafka.Topics.OutputClientData),
@ -64,8 +61,8 @@ func AddPublicRoutes(
} }
routing.Setup( routing.Setup(
router, cfg, roomserverProducer, rsAPI, asAPI, router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing, accountsDB, deviceDB, federation, *keyRing,
syncProducer, eduProducer, transactionsCache, fsAPI, syncProducer, transactionsCache, fsAPI,
) )
} }

View File

@ -29,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
@ -137,21 +136,21 @@ type fledglingEvent struct {
// CreateRoom implements /createRoom // CreateRoom implements /createRoom
func CreateRoom( func CreateRoom(
req *http.Request, device *authtypes.Device, req *http.Request, device *authtypes.Device,
cfg *config.Dendrite, producer *producers.RoomserverProducer, cfg *config.Dendrite,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse { ) util.JSONResponse {
// TODO (#267): Check room ID doesn't clash with an existing one, and we // TODO (#267): Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs? // probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName) roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
return createRoom(req, device, cfg, roomID, producer, accountDB, rsAPI, asAPI) return createRoom(req, device, cfg, roomID, accountDB, rsAPI, asAPI)
} }
// createRoom implements /createRoom // createRoom implements /createRoom
// nolint: gocyclo // nolint: gocyclo
func createRoom( func createRoom(
req *http.Request, device *authtypes.Device, req *http.Request, device *authtypes.Device,
cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer, cfg *config.Dendrite, roomID string,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse { ) util.JSONResponse {
@ -344,9 +343,9 @@ func createRoom(
} }
// send events to the room server // send events to the room server
_, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil) _, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil)
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -404,14 +403,14 @@ func createRoom(
} }
} }
// Send the invite event to the roomserver. // Send the invite event to the roomserver.
if err = producer.SendInvite( if err = roomserverAPI.SendInvite(
req.Context(), req.Context(), rsAPI,
inviteEvent.Headered(roomVersion), inviteEvent.Headered(roomVersion),
strippedState, // invite room state strippedState, // invite room state
cfg.Matrix.ServerName, // send as server cfg.Matrix.ServerName, // send as server
nil, // transaction ID nil, // transaction ID
); err != nil { ); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
} }

View File

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
@ -46,7 +45,6 @@ func SendMembership(
req *http.Request, accountDB accounts.Database, device *authtypes.Device, req *http.Request, accountDB accounts.Database, device *authtypes.Device,
roomID string, membership string, cfg *config.Dendrite, roomID string, membership string, cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
) util.JSONResponse { ) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{} verRes := api.QueryRoomVersionForRoomResponse{}
@ -71,7 +69,7 @@ func SendMembership(
} }
inviteStored, jsonErrResp := checkAndProcessThreepid( inviteStored, jsonErrResp := checkAndProcessThreepid(
req, device, &body, cfg, rsAPI, accountDB, producer, req, device, &body, cfg, rsAPI, accountDB,
membership, roomID, evTime, membership, roomID, evTime,
) )
if jsonErrResp != nil { if jsonErrResp != nil {
@ -112,8 +110,8 @@ func SendMembership(
switch membership { switch membership {
case gomatrixserverlib.Invite: case gomatrixserverlib.Invite:
// Invites need to be handled specially // Invites need to be handled specially
err = producer.SendInvite( err = roomserverAPI.SendInvite(
req.Context(), req.Context(), rsAPI,
event.Headered(verRes.RoomVersion), event.Headered(verRes.RoomVersion),
nil, // ask the roomserver to draw up invite room state for us nil, // ask the roomserver to draw up invite room state for us
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
@ -130,14 +128,14 @@ func SendMembership(
}{roomID} }{roomID}
fallthrough fallthrough
default: default:
_, err = producer.SendEvents( _, err = roomserverAPI.SendEvents(
req.Context(), req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
nil, nil,
) )
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
} }
@ -252,13 +250,12 @@ func checkAndProcessThreepid(
cfg *config.Dendrite, cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
accountDB accounts.Database, accountDB accounts.Database,
producer *producers.RoomserverProducer,
membership, roomID string, membership, roomID string,
evTime time.Time, evTime time.Time,
) (inviteStored bool, errRes *util.JSONResponse) { ) (inviteStored bool, errRes *util.JSONResponse) {
inviteStored, err := threepid.CheckAndProcessInvite( inviteStored, err := threepid.CheckAndProcessInvite(
req.Context(), device, body, cfg, rsAPI, accountDB, producer, req.Context(), device, body, cfg, rsAPI, accountDB,
membership, roomID, evTime, membership, roomID, evTime,
) )
if err == threepid.ErrMissingParameter { if err == threepid.ErrMissingParameter {

View File

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -94,8 +93,7 @@ func GetAvatarURL(
// nolint:gocyclo // nolint:gocyclo
func SetAvatarURL( func SetAvatarURL(
req *http.Request, accountDB accounts.Database, device *authtypes.Device, req *http.Request, accountDB accounts.Database, device *authtypes.Device,
userID string, cfg *config.Dendrite, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse { ) util.JSONResponse {
if userID != device.UserID { if userID != device.UserID {
return util.JSONResponse{ return util.JSONResponse{
@ -167,8 +165,8 @@ func SetAvatarURL(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -209,8 +207,7 @@ func GetDisplayName(
// nolint:gocyclo // nolint:gocyclo
func SetDisplayName( func SetDisplayName(
req *http.Request, accountDB accounts.Database, device *authtypes.Device, req *http.Request, accountDB accounts.Database, device *authtypes.Device,
userID string, cfg *config.Dendrite, userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse { ) util.JSONResponse {
if userID != device.UserID { if userID != device.UserID {
return util.JSONResponse{ return util.JSONResponse{
@ -282,8 +279,8 @@ func SetDisplayName(
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
@ -48,7 +49,7 @@ const pathPrefixUnstable = "/client/unstable"
// nolint: gocyclo // nolint: gocyclo
func Setup( func Setup(
publicAPIMux *mux.Router, cfg *config.Dendrite, publicAPIMux *mux.Router, cfg *config.Dendrite,
producer *producers.RoomserverProducer, eduAPI eduServerAPI.EDUServerInputAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
accountDB accounts.Database, accountDB accounts.Database,
@ -56,7 +57,6 @@ func Setup(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing, keyRing gomatrixserverlib.KeyRing,
syncProducer *producers.SyncAPIProducer, syncProducer *producers.SyncAPIProducer,
eduProducer *producers.EDUServerProducer,
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI, federationSender federationSenderAPI.FederationSenderInternalAPI,
) { ) {
@ -89,7 +89,7 @@ func Setup(
r0mux.Handle("/createRoom", r0mux.Handle("/createRoom",
internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return CreateRoom(req, device, cfg, producer, accountDB, rsAPI, asAPI) return CreateRoom(req, device, cfg, accountDB, rsAPI, asAPI)
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}", r0mux.Handle("/join/{roomIDOrAlias}",
@ -125,7 +125,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI, producer) return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI)
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}", r0mux.Handle("/rooms/{roomID}/send/{eventType}",
@ -134,7 +134,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, producer, nil) return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil)
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
@ -145,7 +145,7 @@ func Setup(
} }
txnID := vars["txnID"] txnID := vars["txnID"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID,
nil, cfg, rsAPI, producer, transactionsCache) nil, cfg, rsAPI, transactionsCache)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/event/{eventID}", r0mux.Handle("/rooms/{roomID}/event/{eventID}",
@ -194,7 +194,7 @@ func Setup(
if strings.HasSuffix(eventType, "/") { if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1] eventType = eventType[:len(eventType)-1]
} }
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, producer, nil) return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
@ -205,7 +205,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
stateKey := vars["stateKey"] stateKey := vars["stateKey"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, producer, nil) return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
@ -269,7 +269,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer) return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
@ -280,7 +280,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
txnID := vars["txnID"] txnID := vars["txnID"]
return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
@ -294,7 +294,7 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
txnID := vars["txnID"] txnID := vars["txnID"]
return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
@ -386,7 +386,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows // Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@ -408,7 +408,7 @@ func Setup(
if err != nil { if err != nil {
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
}), }),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows // Browsers use the OPTIONS HTTP method to check if the CORS policy allows

View File

@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/transactions"
@ -46,7 +45,6 @@ func SendEvent(
roomID, eventType string, txnID, stateKey *string, roomID, eventType string, txnID, stateKey *string,
cfg *config.Dendrite, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
producer *producers.RoomserverProducer,
txnCache *transactions.Cache, txnCache *transactions.Cache,
) util.JSONResponse { ) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
@ -80,8 +78,8 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID // pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded // event ID in case of duplicate transaction is discarded
eventID, err := producer.SendEvents( eventID, err := api.SendEvents(
req.Context(), req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion), e.Headered(verRes.RoomVersion),
}, },
@ -89,7 +87,7 @@ func SendEvent(
txnAndSessionID, txnAndSessionID,
) )
if err != nil { if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
util.GetLogger(req.Context()).WithFields(logrus.Fields{ util.GetLogger(req.Context()).WithFields(logrus.Fields{

View File

@ -19,7 +19,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -28,7 +28,7 @@ import (
// sends the device events to the EDU Server // sends the device events to the EDU Server
func SendToDevice( func SendToDevice(
req *http.Request, device *authtypes.Device, req *http.Request, device *authtypes.Device,
eduProducer *producers.EDUServerProducer, eduAPI api.EDUServerInputAPI,
txnCache *transactions.Cache, txnCache *transactions.Cache,
eventType string, txnID *string, eventType string, txnID *string,
) util.JSONResponse { ) util.JSONResponse {
@ -48,8 +48,8 @@ func SendToDevice(
for userID, byUser := range httpReq.Messages { for userID, byUser := range httpReq.Messages {
for deviceID, message := range byUser { for deviceID, message := range byUser {
if err := eduProducer.SendToDevice( if err := api.SendToDevice(
req.Context(), device.UserID, userID, deviceID, eventType, message, req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message,
); err != nil { ); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()

View File

@ -20,8 +20,8 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -35,7 +35,7 @@ type typingContentJSON struct {
func SendTyping( func SendTyping(
req *http.Request, device *authtypes.Device, roomID string, req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB accounts.Database, userID string, accountDB accounts.Database,
eduProducer *producers.EDUServerProducer, eduAPI api.EDUServerInputAPI,
) util.JSONResponse { ) util.JSONResponse {
if device.UserID != userID { if device.UserID != userID {
return util.JSONResponse{ return util.JSONResponse{
@ -69,8 +69,8 @@ func SendTyping(
return *resErr return *resErr
} }
if err = eduProducer.SendTyping( if err = api.SendTyping(
req.Context(), userID, roomID, r.Typing, r.Timeout, req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout,
); err != nil { ); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()

View File

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -88,7 +87,7 @@ func CheckAndProcessInvite(
ctx context.Context, ctx context.Context,
device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite, device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, db accounts.Database, rsAPI api.RoomserverInternalAPI, db accounts.Database,
producer *producers.RoomserverProducer, membership string, roomID string, membership string, roomID string,
evTime time.Time, evTime time.Time,
) (inviteStoredOnIDServer bool, err error) { ) (inviteStoredOnIDServer bool, err error) {
if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") { if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") {
@ -112,7 +111,7 @@ func CheckAndProcessInvite(
// "m.room.third_party_invite" have to be emitted from the data in // "m.room.third_party_invite" have to be emitted from the data in
// storeInviteRes. // storeInviteRes.
err = emit3PIDInviteEvent( err = emit3PIDInviteEvent(
ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, producer, evTime, ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, evTime,
) )
inviteStoredOnIDServer = err == nil inviteStoredOnIDServer = err == nil
@ -331,7 +330,7 @@ func emit3PIDInviteEvent(
ctx context.Context, ctx context.Context,
body *MembershipRequest, res *idServerStoreInviteResponse, body *MembershipRequest, res *idServerStoreInviteResponse,
device *authtypes.Device, roomID string, cfg *config.Dendrite, device *authtypes.Device, roomID string, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, producer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
evTime time.Time, evTime time.Time,
) error { ) error {
builder := &gomatrixserverlib.EventBuilder{ builder := &gomatrixserverlib.EventBuilder{
@ -359,8 +358,8 @@ func emit3PIDInviteEvent(
return err return err
} }
_, err = producer.SendEvents( _, err = api.SendEvents(
ctx, ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion), (*event).Headered(queryRes.RoomVersion),
}, },

View File

@ -29,7 +29,6 @@ import (
p2phttp "github.com/libp2p/go-libp2p-http" p2phttp "github.com/libp2p/go-libp2p-http"
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage" "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
@ -149,7 +148,6 @@ func main() {
&base.Base, federation, rsAPI, keyRing, &base.Base, federation, rsAPI, keyRing,
) )
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -166,7 +164,6 @@ func main() {
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI, FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI, ServerKeyAPI: serverKeyAPI,

View File

@ -25,7 +25,6 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
@ -153,7 +152,6 @@ func main() {
rsComponent.SetFederationSenderAPI(fsAPI) rsComponent.SetFederationSenderAPI(fsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -170,7 +168,6 @@ func main() {
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI, FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI, ServerKeyAPI: serverKeyAPI,

View File

@ -15,7 +15,6 @@
package main package main
import ( import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/basecomponent"
) )
@ -33,12 +32,10 @@ func main() {
fsAPI := base.FederationSenderHTTPClient() fsAPI := base.FederationSenderHTTPClient()
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
asAPI := base.AppserviceHTTPClient() asAPI := base.AppserviceHTTPClient()
// TODO: this isn't a producer
eduProducer := producers.NewEDUServerProducer(base.EDUServerClient())
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing, base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing,
rsAPI, asAPI, fsAPI, eduProducer, rsAPI, asAPI, fsAPI, base.EDUServerClient(),
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View File

@ -19,7 +19,6 @@ import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
@ -105,7 +104,6 @@ func main() {
} }
rsComponent.SetFederationSenderAPI(fsAPI) rsComponent.SetFederationSenderAPI(fsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -122,7 +120,6 @@ func main() {
AppserviceAPI: asAPI, AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI, FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI, ServerKeyAPI: serverKeyAPI,

View File

@ -23,7 +23,6 @@ import (
"syscall/js" "syscall/js"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
@ -209,7 +208,6 @@ func main() {
rsAPI.SetFederationSenderAPI(fedSenderAPI) rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -226,7 +224,6 @@ func main() {
AppserviceAPI: asQuery, AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI, EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fedSenderAPI, FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
//ServerKeyAPI: serverKeyAPI, //ServerKeyAPI: serverKeyAPI,

View File

@ -1,3 +1,5 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
@ -10,35 +12,22 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package producers package api
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"time" "time"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
// EDUServerProducer produces events for the EDU server to consume
type EDUServerProducer struct {
InputAPI api.EDUServerInputAPI
}
// NewEDUServerProducer creates a new EDUServerProducer
func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
return &EDUServerProducer{
InputAPI: inputAPI,
}
}
// SendTyping sends a typing event to EDU server // SendTyping sends a typing event to EDU server
func (p *EDUServerProducer) SendTyping( func SendTyping(
ctx context.Context, userID, roomID string, ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string,
typing bool, timeoutMS int64, typing bool, timeoutMS int64,
) error { ) error {
requestData := api.InputTypingEvent{ requestData := InputTypingEvent{
UserID: userID, UserID: userID,
RoomID: roomID, RoomID: roomID,
Typing: typing, Typing: typing,
@ -46,24 +35,24 @@ func (p *EDUServerProducer) SendTyping(
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
} }
var response api.InputTypingEventResponse var response InputTypingEventResponse
err := p.InputAPI.InputTypingEvent( err := eduAPI.InputTypingEvent(
ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response, ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response,
) )
return err return err
} }
// SendToDevice sends a typing event to EDU server // SendToDevice sends a typing event to EDU server
func (p *EDUServerProducer) SendToDevice( func SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string, ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string,
message interface{}, message interface{},
) error { ) error {
js, err := json.Marshal(message) js, err := json.Marshal(message)
if err != nil { if err != nil {
return err return err
} }
requestData := api.InputSendToDeviceEvent{ requestData := InputSendToDeviceEvent{
UserID: userID, UserID: userID,
DeviceID: deviceID, DeviceID: deviceID,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
@ -72,9 +61,9 @@ func (p *EDUServerProducer) SendToDevice(
Content: js, Content: js,
}, },
} }
request := api.InputSendToDeviceEventRequest{ request := InputSendToDeviceEventRequest{
InputSendToDeviceEvent: requestData, InputSendToDeviceEvent: requestData,
} }
response := api.InputSendToDeviceEventResponse{} response := InputSendToDeviceEventResponse{}
return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response) return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
} }

View File

@ -19,12 +19,11 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
// TODO: Are we really wanting to pull in the producer from clientapi
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -40,13 +39,12 @@ func AddPublicRoutes(
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderInternalAPI, federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
eduProducer *producers.EDUServerProducer, eduAPI eduserverAPI.EDUServerInputAPI,
) { ) {
roomserverProducer := producers.NewRoomserverProducer(rsAPI)
routing.Setup( routing.Setup(
router, cfg, rsAPI, asAPI, roomserverProducer, router, cfg, rsAPI, asAPI,
eduProducer, federationSenderAPI, *keyRing, eduAPI, federationSenderAPI, *keyRing,
federation, accountsDB, deviceDB, federation, accountsDB, deviceDB,
) )
} }

View File

@ -20,8 +20,8 @@ import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverVersion "github.com/matrix-org/dendrite/roomserver/version" roomserverVersion "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
@ -34,7 +34,7 @@ func Invite(
roomID string, roomID string,
eventID string, eventID string,
cfg *config.Dendrite, cfg *config.Dendrite,
producer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
) util.JSONResponse { ) util.JSONResponse {
inviteReq := gomatrixserverlib.InviteV2Request{} inviteReq := gomatrixserverlib.InviteV2Request{}
@ -98,8 +98,8 @@ func Invite(
) )
// Add the invite event to the roomserver. // Add the invite event to the roomserver.
if err = producer.SendInvite( if err = api.SendInvite(
httpReq.Context(), httpReq.Context(), rsAPI,
signedEvent.Headered(inviteReq.RoomVersion()), signedEvent.Headered(inviteReq.RoomVersion()),
inviteReq.InviteRoomState(), inviteReq.InviteRoomState(),
event.Origin(), event.Origin(),

View File

@ -21,7 +21,6 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -144,7 +143,6 @@ func SendJoin(
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
cfg *config.Dendrite, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
roomID, eventID string, roomID, eventID string,
) util.JSONResponse { ) util.JSONResponse {
@ -267,8 +265,8 @@ func SendJoin(
// We are responsible for notifying other servers that the user has joined // We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName // the room, so set SendAsServer to cfg.Matrix.ServerName
if !alreadyJoined { if !alreadyJoined {
_, err = producer.SendEvents( _, err = api.SendEvents(
httpReq.Context(), httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion), event.Headered(stateAndAuthChainResponse.RoomVersion),
}, },
@ -276,7 +274,7 @@ func SendJoin(
nil, nil,
) )
if err != nil { if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
} }

View File

@ -17,7 +17,6 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -113,13 +112,13 @@ func SendLeave(
httpReq *http.Request, httpReq *http.Request,
request *gomatrixserverlib.FederationRequest, request *gomatrixserverlib.FederationRequest,
cfg *config.Dendrite, cfg *config.Dendrite,
producer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
roomID, eventID string, roomID, eventID string,
) util.JSONResponse { ) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{} verRes := api.QueryRoomVersionForRoomResponse{}
if err := producer.RsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()), JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
@ -194,8 +193,8 @@ func SendLeave(
// Send the events to the room server. // Send the events to the room server.
// We are responsible for notifying other servers that the user has left // We are responsible for notifying other servers that the user has left
// the room, so set SendAsServer to cfg.Matrix.ServerName // the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents( _, err = api.SendEvents(
httpReq.Context(), httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion), event.Headered(verRes.RoomVersion),
}, },

View File

@ -21,7 +21,7 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/producers" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
@ -49,8 +49,7 @@ func Setup(
cfg *config.Dendrite, cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer, eduAPI eduserverAPI.EDUServerInputAPI,
eduProducer *producers.EDUServerProducer,
fsAPI federationSenderAPI.FederationSenderInternalAPI, fsAPI federationSenderAPI.FederationSenderInternalAPI,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
@ -82,7 +81,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send( return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, producer, eduProducer, keys, federation, cfg, rsAPI, eduAPI, keys, federation,
) )
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)
@ -92,14 +91,14 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Invite( return Invite(
httpReq, request, vars["roomID"], vars["eventID"], httpReq, request, vars["roomID"], vars["eventID"],
cfg, producer, keys, cfg, rsAPI, keys,
) )
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)
v1fedmux.Handle("/3pid/onbind", internal.MakeExternalAPI("3pid_onbind", v1fedmux.Handle("/3pid/onbind", internal.MakeExternalAPI("3pid_onbind",
func(req *http.Request) util.JSONResponse { func(req *http.Request) util.JSONResponse {
return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, producer, federation, accountDB) return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, federation, accountDB)
}, },
)).Methods(http.MethodPost, http.MethodOptions) )).Methods(http.MethodPost, http.MethodOptions)
@ -107,7 +106,7 @@ func Setup(
"exchange_third_party_invite", cfg.Matrix.ServerName, keys, wakeup, "exchange_third_party_invite", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return ExchangeThirdPartyInvite( return ExchangeThirdPartyInvite(
httpReq, request, vars["roomID"], rsAPI, cfg, federation, producer, httpReq, request, vars["roomID"], rsAPI, cfg, federation,
) )
}, },
)).Methods(http.MethodPut, http.MethodOptions) )).Methods(http.MethodPut, http.MethodOptions)
@ -206,7 +205,7 @@ func Setup(
roomID := vars["roomID"] roomID := vars["roomID"]
eventID := vars["eventID"] eventID := vars["eventID"]
res := SendJoin( res := SendJoin(
httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, httpReq, request, cfg, rsAPI, keys, roomID, eventID,
) )
return util.JSONResponse{ return util.JSONResponse{
Headers: res.Headers, Headers: res.Headers,
@ -224,7 +223,7 @@ func Setup(
roomID := vars["roomID"] roomID := vars["roomID"]
eventID := vars["eventID"] eventID := vars["eventID"]
return SendJoin( return SendJoin(
httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, httpReq, request, cfg, rsAPI, keys, roomID, eventID,
) )
}, },
)).Methods(http.MethodPut) )).Methods(http.MethodPut)
@ -246,7 +245,7 @@ func Setup(
roomID := vars["roomID"] roomID := vars["roomID"]
eventID := vars["eventID"] eventID := vars["eventID"]
return SendLeave( return SendLeave(
httpReq, request, cfg, producer, keys, roomID, eventID, httpReq, request, cfg, rsAPI, keys, roomID, eventID,
) )
}, },
)).Methods(http.MethodPut) )).Methods(http.MethodPut)

View File

@ -21,7 +21,7 @@ import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -36,20 +36,18 @@ func Send(
txnID gomatrixserverlib.TransactionID, txnID gomatrixserverlib.TransactionID,
cfg *config.Dendrite, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
producer *producers.RoomserverProducer, eduAPI eduserverAPI.EDUServerInputAPI,
eduProducer *producers.EDUServerProducer,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
) util.JSONResponse { ) util.JSONResponse {
t := txnReq{ t := txnReq{
context: httpReq.Context(), context: httpReq.Context(),
rsAPI: rsAPI, rsAPI: rsAPI,
producer: producer, eduAPI: eduAPI,
eduProducer: eduProducer, keys: keys,
keys: keys, federation: federation,
federation: federation, haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), newEvents: make(map[string]bool),
newEvents: make(map[string]bool),
} }
var txnEvents struct { var txnEvents struct {
@ -91,12 +89,11 @@ func Send(
type txnReq struct { type txnReq struct {
gomatrixserverlib.Transaction gomatrixserverlib.Transaction
context context.Context context context.Context
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
producer *producers.RoomserverProducer eduAPI eduserverAPI.EDUServerInputAPI
eduProducer *producers.EDUServerProducer keys gomatrixserverlib.JSONVerifier
keys gomatrixserverlib.JSONVerifier federation txnFederationClient
federation txnFederationClient
// local cache of events for auth checks, etc - this may include events // local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of. // which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent haveEvents map[string]*gomatrixserverlib.HeaderedEvent
@ -262,7 +259,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event") util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event")
continue continue
} }
if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { if err := eduserverAPI.SendTyping(t.context, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server") util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
} }
case gomatrixserverlib.MDirectToDevice: case gomatrixserverlib.MDirectToDevice:
@ -275,7 +272,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for userID, byUser := range directPayload.Messages { for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser { for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here // TODO: check that the user and the device actually exist here
if err := t.eduProducer.SendToDevice(t.context, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { if err := eduserverAPI.SendToDevice(t.context, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{ util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender, "sender": directPayload.Sender,
"user_id": userID, "user_id": userID,
@ -325,8 +322,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
} }
// pass the event to the roomserver // pass the event to the roomserver
_, err := t.producer.SendEvents( _, err := api.SendEvents(
t.context, t.context, t.rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion), e.Headered(stateResp.RoomVersion),
}, },
@ -408,7 +405,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// pass the event along with the state to the roomserver using a background context so we don't // pass the event along with the state to the roomserver using a background context so we don't
// needlessly expire // needlessly expire
return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs()) return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs())
} }
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)

View File

@ -8,7 +8,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/producers"
eduAPI "github.com/matrix-org/dendrite/eduserver/api" eduAPI "github.com/matrix-org/dendrite/eduserver/api"
fsAPI "github.com/matrix-org/dendrite/federationsender/api" fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -339,14 +338,13 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver
func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
t := &txnReq{ t := &txnReq{
context: context.Background(), context: context.Background(),
rsAPI: rsAPI, rsAPI: rsAPI,
producer: producers.NewRoomserverProducer(rsAPI), eduAPI: &testEDUProducer{},
eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}), keys: &testNopJSONVerifier{},
keys: &testNopJSONVerifier{}, federation: fedClient,
federation: fedClient, haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), newEvents: make(map[string]bool),
newEvents: make(map[string]bool),
} }
t.PDUs = pdus t.PDUs = pdus
t.Origin = testOrigin t.Origin = testOrigin

View File

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -60,7 +59,7 @@ var (
func CreateInvitesFrom3PIDInvites( func CreateInvitesFrom3PIDInvites(
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite, asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite,
producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
accountDB accounts.Database, accountDB accounts.Database,
) util.JSONResponse { ) util.JSONResponse {
var body invites var body invites
@ -92,8 +91,8 @@ func CreateInvitesFrom3PIDInvites(
} }
// Send all the events // Send all the events
if _, err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil { if _, err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }
@ -111,7 +110,6 @@ func ExchangeThirdPartyInvite(
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
cfg *config.Dendrite, cfg *config.Dendrite,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
producer *producers.RoomserverProducer,
) util.JSONResponse { ) util.JSONResponse {
var builder gomatrixserverlib.EventBuilder var builder gomatrixserverlib.EventBuilder
if err := json.Unmarshal(request.Content(), &builder); err != nil { if err := json.Unmarshal(request.Content(), &builder); err != nil {
@ -176,15 +174,15 @@ func ExchangeThirdPartyInvite(
} }
// Send the event to the roomserver // Send the event to the roomserver
if _, err = producer.SendEvents( if _, err = api.SendEvents(
httpReq.Context(), httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{ []gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion), signedEvent.Event.Headered(verRes.RoomVersion),
}, },
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
nil, nil,
); err != nil { ); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError() return jsonerror.InternalServerError()
} }

View File

@ -7,7 +7,6 @@ import (
"github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/producers"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
@ -41,8 +40,6 @@ type Monolith struct {
RoomserverAPI roomserverAPI.RoomserverInternalAPI RoomserverAPI roomserverAPI.RoomserverInternalAPI
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
// TODO: remove, this isn't even a producer
EDUProducer *producers.EDUServerProducer
// TODO: can we remove this? It's weird that we are required the database // TODO: can we remove this? It's weird that we are required the database
// yet every other component can do that on its own. libp2p-demo uses a custom // yet every other component can do that on its own. libp2p-demo uses a custom
// database though annoyingly. // database though annoyingly.
@ -65,7 +62,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
federationapi.AddPublicRoutes( federationapi.AddPublicRoutes(
publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient, publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI, m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI,
m.EDUProducer, m.EDUInternalAPI,
) )
mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB) mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB)
publicroomsapi.AddPublicRoutes( publicroomsapi.AddPublicRoutes(

View File

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd // Copyright 2020 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -12,63 +12,51 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package producers package api
import ( import (
"context" "context"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
// RoomserverProducer produces events for the roomserver to consume. // SendEvents to the roomserver The events are written with KindNew.
type RoomserverProducer struct { func SendEvents(
RsAPI api.RoomserverInternalAPI ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent,
} sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
// NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(rsAPI api.RoomserverInternalAPI) *RoomserverProducer {
return &RoomserverProducer{
RsAPI: rsAPI,
}
}
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
func (c *RoomserverProducer) SendEvents(
ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName,
txnID *api.TransactionID,
) (string, error) { ) (string, error) {
ires := make([]api.InputRoomEvent, len(events)) ires := make([]InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
ires[i] = api.InputRoomEvent{ ires[i] = InputRoomEvent{
Kind: api.KindNew, Kind: KindNew,
Event: event, Event: event,
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer), SendAsServer: string(sendAsServer),
TransactionID: txnID, TransactionID: txnID,
} }
} }
return c.SendInputRoomEvents(ctx, ires) return SendInputRoomEvents(ctx, rsAPI, ires)
} }
// SendEventWithState writes an event with KindNew to the roomserver input log // SendEventWithState writes an event with KindNew to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is // with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs // marked as `true` in haveEventIDs
func (c *RoomserverProducer) SendEventWithState( func SendEventWithState(
ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState,
event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
) error { ) error {
outliers, err := state.Events() outliers, err := state.Events()
if err != nil { if err != nil {
return err return err
} }
var ires []api.InputRoomEvent var ires []InputRoomEvent
for _, outlier := range outliers { for _, outlier := range outliers {
if haveEventIDs[outlier.EventID()] { if haveEventIDs[outlier.EventID()] {
continue continue
} }
ires = append(ires, api.InputRoomEvent{ ires = append(ires, InputRoomEvent{
Kind: api.KindOutlier, Kind: KindOutlier,
Event: outlier.Headered(event.RoomVersion), Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(), AuthEventIDs: outlier.AuthEventIDs(),
}) })
@ -79,39 +67,40 @@ func (c *RoomserverProducer) SendEventWithState(
stateEventIDs[i] = state.StateEvents[i].EventID() stateEventIDs[i] = state.StateEvents[i].EventID()
} }
ires = append(ires, api.InputRoomEvent{ ires = append(ires, InputRoomEvent{
Kind: api.KindNew, Kind: KindNew,
Event: event, Event: event,
AuthEventIDs: event.AuthEventIDs(), AuthEventIDs: event.AuthEventIDs(),
HasState: true, HasState: true,
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
}) })
_, err = c.SendInputRoomEvents(ctx, ires) _, err = SendInputRoomEvents(ctx, rsAPI, ires)
return err return err
} }
// SendInputRoomEvents writes the given input room events to the roomserver input API. // SendInputRoomEvents to the roomserver.
func (c *RoomserverProducer) SendInputRoomEvents( func SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent, ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent,
) (eventID string, err error) { ) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires} request := InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse var response InputRoomEventsResponse
err = c.RsAPI.InputRoomEvents(ctx, &request, &response) err = rsAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID eventID = response.EventID
return return
} }
// SendInvite writes the invite event to the roomserver input API. // SendInvite event to the roomserver.
// This should only be needed for invite events that occur outside of a known room. // This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method. // If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite( func SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, ctx context.Context,
rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState, inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) error { ) error {
request := api.InputRoomEventsRequest{ request := InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{ InputInviteEvents: []InputInviteEvent{{
Event: inviteEvent, Event: inviteEvent,
InviteRoomState: inviteRoomState, InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion, RoomVersion: inviteEvent.RoomVersion,
@ -119,6 +108,6 @@ func (c *RoomserverProducer) SendInvite(
TransactionID: txnID, TransactionID: txnID,
}}, }},
} }
var response api.InputRoomEventsResponse var response InputRoomEventsResponse
return c.RsAPI.InputRoomEvents(ctx, &request, &response) return rsAPI.InputRoomEvents(ctx, &request, &response)
} }