diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index e3e0ef9d..621920ef 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/internal" "github.com/matrix-org/dendrite/federationsender/inthttp" - "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" @@ -49,13 +48,13 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to connect to federation sender db") } - roomserverProducer := producers.NewRoomserverProducer( - rsAPI, base.Cfg.Matrix.ServerName, base.Cfg.Matrix.KeyID, base.Cfg.Matrix.PrivateKey, - ) - statistics := &types.Statistics{} queues := queue.NewOutgoingQueues( - base.Cfg.Matrix.ServerName, federation, roomserverProducer, statistics, + base.Cfg.Matrix.ServerName, federation, rsAPI, statistics, &queue.SigningInfo{ + KeyID: base.Cfg.Matrix.KeyID, + PrivateKey: base.Cfg.Matrix.PrivateKey, + ServerName: base.Cfg.Matrix.ServerName, + }, ) rsConsumer := consumers.NewOutputRoomEventConsumer( @@ -73,5 +72,5 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start typing server consumer") } - return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, statistics, queues) + return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, statistics, queues) } diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index c2ea0d41..0dca32fc 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -1,22 +1,20 @@ package internal import ( - "github.com/matrix-org/dendrite/federationsender/api" - "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" ) // FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI type FederationSenderInternalAPI struct { - api.FederationSenderInternalAPI db storage.Database cfg *config.Dendrite statistics *types.Statistics - producer *producers.RoomserverProducer + rsAPI api.RoomserverInternalAPI federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing queues *queue.OutgoingQueues @@ -24,7 +22,7 @@ type FederationSenderInternalAPI struct { func NewFederationSenderInternalAPI( db storage.Database, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, statistics *types.Statistics, @@ -33,7 +31,7 @@ func NewFederationSenderInternalAPI( return &FederationSenderInternalAPI{ db: db, cfg: cfg, - producer: producer, + rsAPI: rsAPI, federation: federation, keyRing: keyRing, statistics: statistics, diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index c601e960..7ced4af8 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -7,6 +7,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/internal/perform" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -175,10 +176,11 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( // If we successfully performed a send_join above then the other // server now thinks we're a part of the room. Send the newly // returned state to the roomserver to update our local view. - if err = r.producer.SendEventWithState( - ctx, - respSendJoin.ToRespState(), - event.Headered(respMakeJoin.RoomVersion), + respState := respSendJoin.ToRespState() + if err = roomserverAPI.SendEventWithState( + ctx, r.rsAPI, + &respState, + event.Headered(respMakeJoin.RoomVersion), nil, ); err != nil { return fmt.Errorf("r.producer.SendEventWithState: %w", err) } diff --git a/federationsender/producers/roomserver.go b/federationsender/producers/roomserver.go deleted file mode 100644 index 76fedf53..00000000 --- a/federationsender/producers/roomserver.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package producers - -import ( - "context" - "crypto/ed25519" - - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" -) - -// RoomserverProducer produces events for the roomserver to consume. -type RoomserverProducer struct { - InputAPI api.RoomserverInternalAPI - serverName gomatrixserverlib.ServerName - keyID gomatrixserverlib.KeyID - privateKey ed25519.PrivateKey -} - -// NewRoomserverProducer creates a new RoomserverProducer -func NewRoomserverProducer( - rsAPI api.RoomserverInternalAPI, serverName gomatrixserverlib.ServerName, - keyID gomatrixserverlib.KeyID, privateKey ed25519.PrivateKey, -) *RoomserverProducer { - return &RoomserverProducer{ - InputAPI: rsAPI, - serverName: serverName, - keyID: keyID, - privateKey: privateKey, - } -} - -// SendInviteResponse drops an invite response back into the roomserver so that users -// already in the room will be notified of the new invite. The invite response is signed -// by the remote side. -func (c *RoomserverProducer) SendInviteResponse( - ctx context.Context, res gomatrixserverlib.RespInviteV2, roomVersion gomatrixserverlib.RoomVersion, -) (string, error) { - ev := res.Event.Sign(string(c.serverName), c.keyID, c.privateKey).Headered(roomVersion) - ire := api.InputRoomEvent{ - Kind: api.KindNew, - Event: ev, - AuthEventIDs: ev.AuthEventIDs(), - SendAsServer: string(c.serverName), - TransactionID: nil, - } - return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire}) -} - -// SendEventWithState writes an event with KindNew to the roomserver input log -// with the state at the event as KindOutlier before it. -func (c *RoomserverProducer) SendEventWithState( - ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, -) error { - outliers, err := state.Events() - if err != nil { - return err - } - - var ires []api.InputRoomEvent - for _, outlier := range outliers { - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindOutlier, - Event: outlier.Headered(event.RoomVersion), - AuthEventIDs: outlier.AuthEventIDs(), - }) - } - - stateEventIDs := make([]string, len(state.StateEvents)) - for i := range state.StateEvents { - stateEventIDs[i] = state.StateEvents[i].EventID() - } - - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindNew, - Event: event, - AuthEventIDs: event.AuthEventIDs(), - HasState: true, - StateEventIDs: stateEventIDs, - }) - - _, err = c.SendInputRoomEvents(ctx, ires) - return err -} - -// SendInputRoomEvents writes the given input room events to the roomserver input API. -func (c *RoomserverProducer) SendInputRoomEvents( - ctx context.Context, ires []api.InputRoomEvent, -) (eventID string, err error) { - request := api.InputRoomEventsRequest{InputRoomEvents: ires} - var response api.InputRoomEventsResponse - err = c.InputAPI.InputRoomEvents(ctx, &request, &response) - eventID = response.EventID - return -} diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index bf9042f4..4449f9e6 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -20,8 +20,8 @@ import ( "fmt" "time" - "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -34,7 +34,8 @@ import ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { - rsProducer *producers.RoomserverProducer // roomserver producer + signing *SigningInfo + rsAPI api.RoomserverInternalAPI client *gomatrixserverlib.FederationClient // federation client origin gomatrixserverlib.ServerName // origin of requests destination gomatrixserverlib.ServerName // destination of requests @@ -370,11 +371,9 @@ func (oq *destinationQueue) nextInvites( return done, err } - if _, err = oq.rsProducer.SendInviteResponse( - context.TODO(), - inviteRes, - roomVersion, - ); err != nil { + invEv := inviteRes.Event.Sign(string(oq.signing.ServerName), oq.signing.KeyID, oq.signing.PrivateKey).Headered(roomVersion) + _, err = api.SendEvents(context.TODO(), oq.rsAPI, []gomatrixserverlib.HeaderedEvent{invEv}, oq.signing.ServerName, nil) + if err != nil { log.WithFields(log.Fields{ "event_id": ev.EventID(), "state_key": ev.StateKey(), diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 386a3397..5b8fc3c5 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -15,11 +15,12 @@ package queue import ( + "crypto/ed25519" "fmt" "sync" - "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" @@ -28,10 +29,11 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { - rsProducer *producers.RoomserverProducer + rsAPI api.RoomserverInternalAPI origin gomatrixserverlib.ServerName client *gomatrixserverlib.FederationClient statistics *types.Statistics + signing *SigningInfo queuesMutex sync.Mutex // protects the below queues map[gomatrixserverlib.ServerName]*destinationQueue } @@ -40,18 +42,28 @@ type OutgoingQueues struct { func NewOutgoingQueues( origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient, - rsProducer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, statistics *types.Statistics, + signing *SigningInfo, ) *OutgoingQueues { return &OutgoingQueues{ - rsProducer: rsProducer, + rsAPI: rsAPI, origin: origin, client: client, statistics: statistics, + signing: signing, queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } } +// TODO: Move this somewhere useful for other components as we often need to ferry these 3 variables +// around together +type SigningInfo struct { + ServerName gomatrixserverlib.ServerName + KeyID gomatrixserverlib.KeyID + PrivateKey ed25519.PrivateKey +} + func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue { oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() @@ -64,7 +76,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ - rsProducer: oqs.rsProducer, + rsAPI: oqs.rsAPI, origin: oqs.origin, destination: destination, client: oqs.client, @@ -73,6 +85,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), retryServerCh: make(chan bool), + signing: oqs.signing, } oqs.queues[destination] = oq }