diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go new file mode 100644 index 00000000..c4cd0e59 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/typingserver.go @@ -0,0 +1,96 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/typingserver/api" + "github.com/matrix-org/gomatrixserverlib" + log "github.com/sirupsen/logrus" + "gopkg.in/Shopify/sarama.v1" +) + +// OutputTypingEventConsumer consumes events that originate in typing server. +type OutputTypingEventConsumer struct { + consumer *common.ContinualConsumer + db *storage.Database + queues *queue.OutgoingQueues + ServerName gomatrixserverlib.ServerName +} + +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers. +func NewOutputTypingEventConsumer( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + queues *queue.OutgoingQueues, + store *storage.Database, +) *OutputTypingEventConsumer { + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputTypingEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + c := &OutputTypingEventConsumer{ + consumer: &consumer, + queues: queues, + db: store, + ServerName: cfg.Matrix.ServerName, + } + consumer.ProcessMessage = c.onMessage + + return c +} + +// Start consuming from typing servers +func (t *OutputTypingEventConsumer) Start() error { + return t.consumer.Start() +} + +// onMessage is called for OutputTypingEvent received from the typing servers. +// Parses the msg, creates a matrix federation EDU and sends it to joined hosts. +func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + // Extract the typing event from msg. + var ote api.OutputTypingEvent + if err := json.Unmarshal(msg.Value, &ote); err != nil { + // Skip this msg but continue processing messages. + log.WithError(err).Errorf("typingserver output log: message parse failed") + return nil + } + + joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID) + if err != nil { + return err + } + + names := make([]gomatrixserverlib.ServerName, len(joined)) + for i := range joined { + names[i] = joined[i].ServerName + } + + edu := &gomatrixserverlib.EDU{Type: ote.Event.Type} + if edu.Content, err = json.Marshal(map[string]interface{}{ + "room_id": ote.Event.RoomID, + "user_id": ote.Event.UserID, + "typing": ote.Event.Typing, + }); err != nil { + return err + } + + return t.queues.SendEDU(edu, t.ServerName, names) +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go index fa54a05c..9b732b38 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go +++ b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go @@ -38,11 +38,18 @@ func SetupFederationSenderComponent( queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) - consumer := consumers.NewOutputRoomEventConsumer( + rsConsumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, queues, federationSenderDB, queryAPI, ) - if err = consumer.Start(); err != nil { + if err = rsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start room server consumer") } + + tsConsumer := consumers.NewOutputTypingEventConsumer( + base.Cfg, base.KafkaConsumer, queues, federationSenderDB, + ) + if err := tsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start typing server consumer") + } } diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go index 2013a7a4..c0afe3be 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -33,12 +33,13 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName // The running mutex protects running, sentCounter, lastTransactionIDs and - // pendingEvents. + // pendingEvents, pendingEDUs. runningMutex sync.Mutex running bool sentCounter int lastTransactionIDs []gomatrixserverlib.TransactionID pendingEvents []*gomatrixserverlib.Event + pendingEDUs []*gomatrixserverlib.EDU } // Send event adds the event to the pending queue for the destination. @@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { } } +// sendEDU adds the EDU event to the pending queue for the destination. +// If the queue is empty then it starts a background goroutine to +// start sending event to that destination. +func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + oq.pendingEDUs = append(oq.pendingEDUs, e) + if !oq.running { + oq.running = true + go oq.backgroundSend() + } +} + func (oq *destinationQueue) backgroundSend() { for { t := oq.next() @@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() - if len(oq.pendingEvents) == 0 { + + if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { oq.running = false return nil } + var t gomatrixserverlib.Transaction now := gomatrixserverlib.AsTimestamp(time.Now()) t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) @@ -96,11 +112,20 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { if t.PreviousIDs == nil { t.PreviousIDs = []gomatrixserverlib.TransactionID{} } + oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + for _, pdu := range oq.pendingEvents { t.PDUs = append(t.PDUs, *pdu) } oq.pendingEvents = nil oq.sentCounter += len(t.PDUs) + + for _, edu := range oq.pendingEDUs { + t.EDUs = append(t.EDUs, *edu) + } + oq.pendingEDUs = nil + oq.sentCounter += len(t.EDUs) + return &t } diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go index d31c12f9..4a38dc08 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent( destinations []gomatrixserverlib.ServerName, ) error { if origin != oqs.origin { - // TODO: Support virtual hosting by allowing us to send events using - // different origin server names. - // For now assume we are always asked to send as the single server configured - // in the dendrite config. + // TODO: Support virtual hosting; gh issue #577. return fmt.Errorf( "sendevent: unexpected server to send as: got %q expected %q", origin, oqs.origin, @@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent( } oqs.queues[destination] = oq } + oq.sendEvent(ev) } + + return nil +} + +// SendEDU sends an EDU event to the destinations +func (oqs *OutgoingQueues) SendEDU( + e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName, + destinations []gomatrixserverlib.ServerName, +) error { + if origin != oqs.origin { + // TODO: Support virtual hosting; gh issue #577. + return fmt.Errorf( + "sendevent: unexpected server to send as: got %q expected %q", + origin, oqs.origin, + ) + } + + // Remove our own server from the list of destinations. + destinations = filterDestinations(oqs.origin, destinations) + + log.WithFields(log.Fields{ + "destinations": destinations, "edu_type": e.Type, + }).Info("Sending EDU event") + + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + for _, destination := range destinations { + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + origin: oqs.origin, + destination: destination, + client: oqs.client, + } + oqs.queues[destination] = oq + } + + oq.sendEDU(e) + } + return nil } diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go index 487de9e6..5d652a1a 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go @@ -97,10 +97,22 @@ func (s *joinedHostsStatements) deleteJoinedHosts( return err } -func (s *joinedHostsStatements) selectJoinedHosts( +func (s *joinedHostsStatements) selectJoinedHostsWithTx( ctx context.Context, txn *sql.Tx, roomID string, ) ([]types.JoinedHost, error) { stmt := common.TxStmt(txn, s.selectJoinedHostsStmt) + return joinedHostsFromStmt(ctx, stmt, roomID) +} + +func (s *joinedHostsStatements) selectJoinedHosts( + ctx context.Context, roomID string, +) ([]types.JoinedHost, error) { + return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID) +} + +func joinedHostsFromStmt( + ctx context.Context, stmt *sql.Stmt, roomID string, +) ([]types.JoinedHost, error) { rows, err := stmt.QueryContext(ctx, roomID) if err != nil { return nil, err @@ -118,5 +130,6 @@ func (s *joinedHostsStatements) selectJoinedHosts( ServerName: gomatrixserverlib.ServerName(serverName), }) } + return result, nil } diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go index e84d639d..3a0f8775 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -92,7 +92,7 @@ func (d *Database) UpdateRoom( } } - joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID) + joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID) if err != nil { return err } @@ -110,3 +110,12 @@ func (d *Database) UpdateRoom( }) return } + +// GetJoinedHosts returns the currently joined hosts for room, +// as known to federationserver. +// Returns an error if something goes wrong. +func (d *Database) GetJoinedHosts( + ctx context.Context, roomID string, +) ([]types.JoinedHost, error) { + return d.selectJoinedHosts(ctx, roomID) +} diff --git a/src/github.com/matrix-org/dendrite/typingserver/api/output.go b/src/github.com/matrix-org/dendrite/typingserver/api/output.go index 08f83499..813b9b7c 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/typingserver/api/output.go @@ -13,19 +13,19 @@ package api // OutputTypingEvent is an entry in typing server output kafka log. +// This contains the event with extra fields used to create 'm.typing' event +// in clientapi & federation. type OutputTypingEvent struct { // The Event for the typing edu event. Event TypingEvent `json:"event"` + // Users typing in the room when the event was generated. + TypingUsers []string `json:"typing_users"` } // TypingEvent represents a matrix edu event of type 'm.typing'. type TypingEvent struct { - Type string `json:"type"` - RoomID string `json:"room_id"` - Content TypingEventContent `json:"content"` -} - -// TypingEventContent for TypingEvent -type TypingEventContent struct { - UserIDs []string `json:"user_ids"` + Type string `json:"type"` + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` } diff --git a/src/github.com/matrix-org/dendrite/typingserver/input/input.go b/src/github.com/matrix-org/dendrite/typingserver/input/input.go index 735c4da6..b9968ce4 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/typingserver/input/input.go @@ -53,24 +53,29 @@ func (t *TypingServerInputAPI) InputTypingEvent( t.Cache.RemoveUser(ite.UserID, ite.RoomID) } - return t.sendUpdateForRoom(ite.RoomID) + return t.sendEvent(ite) } -func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error { - userIDs := t.Cache.GetTypingUsers(roomID) - event := &api.TypingEvent{ - Type: gomatrixserverlib.MTyping, - RoomID: roomID, - Content: api.TypingEventContent{UserIDs: userIDs}, +func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { + userIDs := t.Cache.GetTypingUsers(ite.RoomID) + ev := &api.TypingEvent{ + Type: gomatrixserverlib.MTyping, + RoomID: ite.RoomID, + UserID: ite.UserID, } - eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event}) + ote := &api.OutputTypingEvent{ + Event: *ev, + TypingUsers: userIDs, + } + + eventJSON, err := json.Marshal(ote) if err != nil { return err } m := &sarama.ProducerMessage{ Topic: string(t.OutputTypingEventTopic), - Key: sarama.StringEncoder(roomID), + Key: sarama.StringEncoder(ite.RoomID), Value: sarama.ByteEncoder(eventJSON), }