From 8f1dca4e74f5154222f5771fc618dde2ca7676ff Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Jun 2017 16:10:17 +0100 Subject: [PATCH] =?UTF-8?q?Add=20a=20component=20for=20sending=20event=20t?= =?UTF-8?q?o=20remote=20matrix=20servers=20using=20fede=E2=80=A6=20(#148)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add a component for sending event to remote matrix servers using federation * Use named constants * Comment on why we combine the deltas * Comments * Copyright headers! * Fix the copy+Pasted comment * Add tests for combineDeltas * Review comments * More review comments * More comments * Even more comments * Name the mutexes * Document what the mutexes are protecting * Simplify the SQL, and more comments --- .../dendrite-federation-sender-server/main.go | 74 ++++ .../dendrite/common/config/config.go | 14 +- .../federationsender/consumers/roomserver.go | 361 ++++++++++++++++++ .../consumers/roomserver_test.go | 53 +++ .../queue/destinationqueue.go | 105 +++++ .../dendrite/federationsender/queue/queue.go | 95 +++++ .../storage/joined_hosts_table.go | 111 ++++++ .../federationsender/storage/room_table.go | 89 +++++ .../federationsender/storage/storage.go | 126 ++++++ .../dendrite/federationsender/types/types.go | 45 +++ 10 files changed, 1068 insertions(+), 5 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/queue/queue.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/storage/storage.go create mode 100644 src/github.com/matrix-org/dendrite/federationsender/types/types.go diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go new file mode 100644 index 00000000..3c7e5c47 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go @@ -0,0 +1,74 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "net/http" + "os" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationsender/consumers" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/prometheus/client_golang/prometheus" + + log "github.com/Sirupsen/logrus" +) + +var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") + +func main() { + common.SetupLogging(os.Getenv("LOG_DIR")) + + flag.Parse() + + if *configPath == "" { + log.Fatal("--config must be supplied") + } + cfg, err := config.Load(*configPath) + if err != nil { + log.Fatalf("Invalid config file: %s", err) + } + + log.Info("config: ", cfg) + + db, err := storage.NewDatabase(string(cfg.Database.FederationSender)) + if err != nil { + log.Panicf("startup: failed to create federation sender database with data source %s : %s", cfg.Database.FederationSender, err) + } + + federation := gomatrixserverlib.NewFederationClient( + cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, + ) + + queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation) + + consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db) + if err != nil { + log.WithError(err).Panicf("startup: failed to create room server consumer") + } + if err = consumer.Start(); err != nil { + log.WithError(err).Panicf("startup: failed to start room server consumer") + } + + http.DefaultServeMux.Handle("/metrics", prometheus.Handler()) + + if err := http.ListenAndServe(string(cfg.Listen.FederationSender), nil); err != nil { + panic(err) + } +} diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index a4977731..58980ca3 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -122,16 +122,20 @@ type Dendrite struct { // The RoomServer database stores information about matrix rooms. // It is only accessed by the RoomServer. RoomServer DataSource `yaml:"room_server"` + // The FederationSender database stores information used by the FederationSender + // It is only accessed by the FederationSender. + FederationSender DataSource `yaml:"federation_sender"` } `yaml:"database"` // The internal addresses the components will listen on. // These should not be exposed externally as they expose metrics and debugging APIs. Listen struct { - MediaAPI Address `yaml:"media_api"` - ClientAPI Address `yaml:"client_api"` - FederationAPI Address `yaml:"federation_api"` - SyncAPI Address `yaml:"sync_api"` - RoomServer Address `yaml:"room_server"` + MediaAPI Address `yaml:"media_api"` + ClientAPI Address `yaml:"client_api"` + FederationAPI Address `yaml:"federation_api"` + SyncAPI Address `yaml:"sync_api"` + RoomServer Address `yaml:"room_server"` + FederationSender Address `yaml:"federation_sender"` } `yaml:"listen"` } diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go new file mode 100644 index 00000000..9ac0e988 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -0,0 +1,361 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "encoding/json" + "fmt" + "strings" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// OutputRoomEvent consumes events that originated in the room server. +type OutputRoomEvent struct { + roomServerConsumer *common.ContinualConsumer + db *storage.Database + queues *queue.OutgoingQueues + query api.RoomserverQueryAPI +} + +// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. +func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + return nil, err + } + roomServerURL := cfg.RoomServerURL() + + consumer := common.ContinualConsumer{ + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEvent{ + roomServerConsumer: &consumer, + db: store, + queues: queues, + query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *OutputRoomEvent) Start() error { + return s.roomServerConsumer.Start() +} + +// onMessage is called when the federation server receives a new event from the room server output log. +// It is unsafe to call this with messages for the same room in multiple gorountines +// because updates it will likely fail with a types.EventIDMismatchError when it +// realises that it cannot update the room state using the deltas. +func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error { + // Parse out the event JSON + var output api.OutputRoomEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(output.Event, false) + if err != nil { + log.WithError(err).Errorf("roomserver output log: event parse failure") + return nil + } + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_id": ev.RoomID(), + "send_as_server": output.SendAsServer, + }).Info("received event from roomserver") + + if err = s.processMessage(output, ev); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": output.AddsStateEventIDs, + "del": output.RemovesStateEventIDs, + }).Panicf("roomserver output log: write event failure") + return nil + } + + return nil +} + +// processMessage updates the list of currently joined hosts in the room +// and then sends the event to the hosts that were joined before the event. +func (s *OutputRoomEvent) processMessage(ore api.OutputRoomEvent, ev gomatrixserverlib.Event) error { + addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ev) + if err != nil { + return err + } + addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents) + if err != nil { + return err + } + // Update our copy of the current state. + // We keep a copy of the current state because the state at each event is + // expressed as a delta against the current state. + // TODO: handle EventIDMismatchError and recover the current state by talking + // to the roomserver + oldJoinedHosts, err := s.db.UpdateRoom( + ev.RoomID(), ore.LastSentEventID, ev.EventID(), + addsJoinedHosts, ore.RemovesStateEventIDs, + ) + if err != nil { + return err + } + + if ore.SendAsServer == api.DoNotSendToOtherServers { + // Ignore event that we don't need to send anywhere. + return nil + } + + // Work out which hosts were joined at the event itself. + joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, ev, oldJoinedHosts) + if err != nil { + return err + } + + // Send the event. + if err = s.queues.SendEvent( + &ev, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent, + ); err != nil { + return err + } + + return nil +} + +// joinedHostsAtEvent works out a list of matrix servers that were joined to +// the room at the event. +// It is important to use the state at the event for sending messages because: +// 1) We shouldn't send messages to servers that weren't in the room. +// 2) If a server is kicked from the rooms it should still be told about the +// kick event, +// Usually the list can be calculated locally, but sometimes it will need fetch +// events from the room server. +// Returns an error if there was a problem talking to the room server. +func (s *OutputRoomEvent) joinedHostsAtEvent( + ore api.OutputRoomEvent, ev gomatrixserverlib.Event, oldJoinedHosts []types.JoinedHost, +) ([]gomatrixserverlib.ServerName, error) { + // Combine the delta into a single delta so that the adds and removes can + // cancel each other out. This should reduce the number of times we need + // to fetch a state event from the room server. + combinedAdds, combinedRemoves := combineDeltas( + ore.AddsStateEventIDs, ore.RemovesStateEventIDs, + ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, + ) + combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ev) + if err != nil { + return nil, err + } + + combinedAddsJoinedHosts, err := joinedHostsFromEvents(combinedAddsEvents) + if err != nil { + return nil, err + } + + removed := map[string]bool{} + for _, eventID := range combinedRemoves { + removed[eventID] = true + } + + joined := map[gomatrixserverlib.ServerName]bool{} + for _, joinedHost := range oldJoinedHosts { + if removed[joinedHost.MemberEventID] { + // This m.room.member event is part of the current state of the + // room, but not part of the state at the event we are processing + // Therefore we can't use it to tell whether the server was in + // the room at the event. + continue + } + joined[joinedHost.ServerName] = true + } + + for _, joinedHost := range combinedAddsJoinedHosts { + // This m.room.member event was part of the state of the room at the + // event, but isn't part of the current state of the room now. + joined[joinedHost.ServerName] = true + } + + var result []gomatrixserverlib.ServerName + for serverName, include := range joined { + if include { + result = append(result, serverName) + } + } + return result, nil +} + +// joinedHostsFromEvents turns a list of state events into a list of joined hosts. +// This errors if one of the events was invalid. +// It should be impossible for an invalid event to get this far in the pipeline. +func joinedHostsFromEvents(evs []gomatrixserverlib.Event) ([]types.JoinedHost, error) { + var joinedHosts []types.JoinedHost + for _, ev := range evs { + if ev.Type() != "m.room.member" || ev.StateKey() == nil { + continue + } + var content struct { + Membership string `json:"membership"` + } + if err := json.Unmarshal(ev.Content(), &content); err != nil { + return nil, err + } + if content.Membership != "join" { + continue + } + serverName, err := domainFromID(*ev.StateKey()) + if err != nil { + return nil, err + } + joinedHosts = append(joinedHosts, types.JoinedHost{ + MemberEventID: ev.EventID(), ServerName: serverName, + }) + } + return joinedHosts, nil +} + +// combineDeltas combines two deltas into a single delta. +// Assumes that the order of operations is add(1), remove(1), add(2), remove(2). +// Removes duplicate entries and redundant operations from each delta. +func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []string) { + addSet := map[string]bool{} + removeSet := map[string]bool{} + + // combine processes each unique value in a list. + // If the value is in the removeFrom set then it is removed from that set. + // Otherwise it is added to the addTo set. + combine := func(values []string, removeFrom, addTo map[string]bool) { + processed := map[string]bool{} + for _, value := range values { + if processed[value] { + continue + } + processed[value] = true + if removeFrom[value] { + delete(removeFrom, value) + } else { + addTo[value] = true + } + } + } + + combine(adds1, nil, addSet) + combine(removes1, addSet, removeSet) + combine(adds2, removeSet, addSet) + combine(removes2, addSet, removeSet) + + for value := range addSet { + adds = append(adds, value) + } + for value := range removeSet { + removes = append(removes, value) + } + return +} + +// lookupStateEvents looks up the state events that are added by a new event. +func (s *OutputRoomEvent) lookupStateEvents( + addsStateEventIDs []string, event gomatrixserverlib.Event, +) ([]gomatrixserverlib.Event, error) { + // Fast path if there aren't any new state events. + if len(addsStateEventIDs) == 0 { + return nil, nil + } + + // Fast path if the only state event added is the event itself. + if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { + return []gomatrixserverlib.Event{event}, nil + } + + missing := addsStateEventIDs + var result []gomatrixserverlib.Event + + // Check if event itself is being added. + for _, eventID := range missing { + if eventID == event.EventID() { + result = append(result, event) + break + } + } + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) == 0 { + return result, nil + } + + // At this point the missing events are neither the event itself nor are + // they present in our local database. Our only option is to fetch them + // from the roomserver using the query API. + eventReq := api.QueryEventsByIDRequest{EventIDs: missing} + var eventResp api.QueryEventsByIDResponse + if err := s.query.QueryEventsByID(&eventReq, &eventResp); err != nil { + return nil, err + } + + result = append(result, eventResp.Events...) + missing = missingEventsFrom(result, addsStateEventIDs) + + if len(missing) != 0 { + return nil, fmt.Errorf( + "missing %d state events IDs at event %q", len(missing), event.EventID(), + ) + } + + return result, nil +} + +func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string { + have := map[string]bool{} + for _, event := range events { + have[event.EventID()] = true + } + var missing []string + for _, eventID := range required { + if !have[eventID] { + missing = append(missing, eventID) + } + } + return missing +} + +// domainFromID returns everything after the first ":" character to extract +// the domain part of a matrix ID. +// TODO: duplicated from gomatrixserverlib. +func domainFromID(id string) (gomatrixserverlib.ServerName, error) { + // IDs have the format: SIGIL LOCALPART ":" DOMAIN + // Split on the first ":" character since the domain can contain ":" + // characters. + parts := strings.SplitN(id, ":", 2) + if len(parts) != 2 { + // The ID must have a ":" character. + return "", fmt.Errorf("invalid ID: %q", id) + } + // Return everything after the first ":" character. + return gomatrixserverlib.ServerName(parts[1]), nil +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go new file mode 100644 index 00000000..bb659b9c --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver_test.go @@ -0,0 +1,53 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "testing" +) + +func TestCombineNoOp(t *testing.T) { + inputAdd1 := []string{"a", "b", "c"} + inputDel1 := []string{"a", "b", "d"} + inputAdd2 := []string{"a", "d", "e"} + inputDel2 := []string{"a", "c", "e", "e"} + + gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2) + + if len(gotAdd) != 0 { + t.Errorf("wanted combined adds to be an empty list, got %#v", gotAdd) + } + + if len(gotDel) != 0 { + t.Errorf("wanted combined removes to be an empty list, got %#v", gotDel) + } +} + +func TestCombineDedup(t *testing.T) { + inputAdd1 := []string{"a", "a"} + inputDel1 := []string{"b", "b"} + inputAdd2 := []string{"a", "a"} + inputDel2 := []string{"b", "b"} + + gotAdd, gotDel := combineDeltas(inputAdd1, inputDel1, inputAdd2, inputDel2) + + if len(gotAdd) != 1 || gotAdd[0] != "a" { + t.Errorf("wanted combined adds to be %#v, got %#v", []string{"a"}, gotAdd) + } + + if len(gotDel) != 1 || gotDel[0] != "b" { + t.Errorf("wanted combined removes to be %#v, got %#v", []string{"b"}, gotDel) + } +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go new file mode 100644 index 00000000..bb274b08 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go @@ -0,0 +1,105 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/gomatrixserverlib" +) + +// destinationQueue is a queue of events for a single destination. +// It is responsible for sending the events to the destination and +// ensures that only one request is in flight to a given destination +// at a time. +type destinationQueue struct { + client *gomatrixserverlib.FederationClient + origin gomatrixserverlib.ServerName + destination gomatrixserverlib.ServerName + // The running mutex protects running, sentCounter, lastTransactionIDs and + // pendingEvents. + runningMutex sync.Mutex + running bool + sentCounter int + lastTransactionIDs []gomatrixserverlib.TransactionID + pendingEvents []*gomatrixserverlib.Event +} + +// Send event adds the event to the pending queue for the destination. +// If the queue is empty then it starts a background goroutine to +// start sending events to that destination. +func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + oq.pendingEvents = append(oq.pendingEvents, ev) + if !oq.running { + oq.running = true + go oq.backgroundSend() + } +} + +func (oq *destinationQueue) backgroundSend() { + for { + t := oq.next() + if t == nil { + // If the queue is empty then stop processing for this destination. + // TODO: Remove this destination from the queue map. + return + } + + // TODO: handle retries. + // TODO: blacklist uncooperative servers. + + _, err := oq.client.SendTransaction(*t) + if err != nil { + log.WithFields(log.Fields{ + "destination": oq.destination, + log.ErrorKey: err, + }).Info("problem sending transaction") + } + } +} + +// next creates a new transaction from the pending event queue +// and flushes the queue. +// Returns nil if the queue was empty. +func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + if len(oq.pendingEvents) == 0 { + oq.running = false + return nil + } + var t gomatrixserverlib.Transaction + now := gomatrixserverlib.AsTimestamp(time.Now()) + t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter)) + t.Origin = oq.origin + t.Destination = oq.destination + t.OriginServerTS = now + t.PreviousIDs = oq.lastTransactionIDs + if t.PreviousIDs == nil { + t.PreviousIDs = []gomatrixserverlib.TransactionID{} + } + oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID} + for _, pdu := range oq.pendingEvents { + t.PDUs = append(t.PDUs, *pdu) + } + oq.pendingEvents = nil + oq.sentCounter += len(t.PDUs) + return &t +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go new file mode 100644 index 00000000..79f019fd --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/queue/queue.go @@ -0,0 +1,95 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/gomatrixserverlib" +) + +// OutgoingQueues is a collection of queues for sending transactions to other +// matrix servers +type OutgoingQueues struct { + origin gomatrixserverlib.ServerName + client *gomatrixserverlib.FederationClient + // The queuesMutex protects queues + queuesMutex sync.Mutex + queues map[gomatrixserverlib.ServerName]*destinationQueue +} + +// NewOutgoingQueues makes a new OutgoingQueues +func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues { + return &OutgoingQueues{ + origin: origin, + client: client, + queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, + } +} + +// SendEvent sends an event to the destinations +func (oqs *OutgoingQueues) SendEvent( + ev *gomatrixserverlib.Event, origin gomatrixserverlib.ServerName, + destinations []gomatrixserverlib.ServerName, +) error { + if origin != oqs.origin { + // TODO: Support virtual hosting by allowing us to send events using + // different origin server names. + // For now assume we are always asked to send as the single server configured + // in the dendrite config. + return fmt.Errorf( + "sendevent: unexpected server to send as: got %q expected %q", + origin, oqs.origin, + ) + } + + // Remove our own server from the list of destinations. + destinations = filterDestinations(oqs.origin, destinations) + + log.WithFields(log.Fields{ + "destinations": destinations, "event": ev.EventID(), + }).Info("Sending event") + + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + for _, destination := range destinations { + oq := oqs.queues[destination] + if oq == nil { + oq = &destinationQueue{ + origin: oqs.origin, + destination: destination, + client: oqs.client, + } + oqs.queues[destination] = oq + } + oq.sendEvent(ev) + } + return nil +} + +// filterDestinations removes our own server from the list of destinations. +// Otherwise we could end up trying to talk to ourselves. +func filterDestinations(origin gomatrixserverlib.ServerName, destinations []gomatrixserverlib.ServerName) []gomatrixserverlib.ServerName { + var result []gomatrixserverlib.ServerName + for _, destination := range destinations { + if destination == origin { + continue + } + result = append(result, destination) + } + return result +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go new file mode 100644 index 00000000..1b5c65dd --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go @@ -0,0 +1,111 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/lib/pq" + "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const joinedHostsSchema = ` +-- The joined_hosts table stores a list of m.room.member event ids in the +-- current state for each room where the membership is "join". +-- There will be an entry for every user that is joined to the room. +CREATE TABLE IF NOT EXISTS joined_hosts ( + -- The string ID of the room. + room_id TEXT NOT NULL, + -- The event ID of the m.room.member join event. + event_id TEXT NOT NULL, + -- The domain part of the user ID the m.room.member event is for. + server_name TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS joined_hosts_event_id_idx + ON joined_hosts (event_id); + +CREATE INDEX IF NOT EXISTS joined_hosts_room_id_idx + ON joined_hosts (room_id) +` + +const insertJoinedHostsSQL = "" + + "INSERT INTO joined_hosts (room_id, event_id, server_name)" + + " VALUES ($1, $2, $3)" + +const deleteJoinedHostsSQL = "" + + "DELETE FROM joined_hosts WHERE event_id = ANY($1)" + +const selectJoinedHostsSQL = "" + + "SELECT event_id, server_name FROM joined_hosts" + + " WHERE room_id = $1" + +type joinedHostsStatements struct { + insertJoinedHostsStmt *sql.Stmt + deleteJoinedHostsStmt *sql.Stmt + selectJoinedHostsStmt *sql.Stmt +} + +func (s *joinedHostsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(joinedHostsSchema) + if err != nil { + return + } + if s.insertJoinedHostsStmt, err = db.Prepare(insertJoinedHostsSQL); err != nil { + return + } + if s.deleteJoinedHostsStmt, err = db.Prepare(deleteJoinedHostsSQL); err != nil { + return + } + if s.selectJoinedHostsStmt, err = db.Prepare(selectJoinedHostsSQL); err != nil { + return + } + return +} + +func (s *joinedHostsStatements) insertJoinedHosts( + txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName, +) error { + _, err := txn.Stmt(s.insertJoinedHostsStmt).Exec(roomID, eventID, serverName) + return err +} + +func (s *joinedHostsStatements) deleteJoinedHosts(txn *sql.Tx, eventIDs []string) error { + _, err := txn.Stmt(s.deleteJoinedHostsStmt).Exec(pq.StringArray(eventIDs)) + return err +} + +func (s *joinedHostsStatements) selectJoinedHosts(txn *sql.Tx, roomID string, +) ([]types.JoinedHost, error) { + rows, err := txn.Stmt(s.selectJoinedHostsStmt).Query(roomID) + if err != nil { + return nil, err + } + defer rows.Close() + + var result []types.JoinedHost + for rows.Next() { + var eventID, serverName string + if err = rows.Scan(&eventID, &serverName); err != nil { + return nil, err + } + result = append(result, types.JoinedHost{ + MemberEventID: eventID, + ServerName: gomatrixserverlib.ServerName(serverName), + }) + } + return result, nil +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go new file mode 100644 index 00000000..e11ed421 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/room_table.go @@ -0,0 +1,89 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" +) + +const roomSchema = ` +CREATE TABLE IF NOT EXISTS rooms ( + -- The string ID of the room + room_id TEXT PRIMARY KEY, + -- The most recent event state by the room server. + -- We can use this to tell if our view of the room state has become + -- desynchronised. + last_event_id TEXT NOT NULL +);` + +const insertRoomSQL = "" + + "INSERT INTO rooms (room_id, last_event_id) VALUES ($1, '')" + + " ON CONFLICT DO NOTHING" + +const selectRoomForUpdateSQL = "" + + "SELECT last_event_id FROM rooms WHERE room_id = $1 FOR UPDATE" + +const updateRoomSQL = "" + + "UPDATE rooms SET last_event_id = $2 WHERE room_id = $1" + +type roomStatements struct { + insertRoomStmt *sql.Stmt + selectRoomForUpdateStmt *sql.Stmt + updateRoomStmt *sql.Stmt +} + +func (s *roomStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(roomSchema) + if err != nil { + return + } + + if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil { + return + } + if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil { + return + } + if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil { + return + } + return +} + +// insertRoom inserts the room if it didn't already exist. +// If the room didn't exist then last_event_id is set to the empty string. +func (s *roomStatements) insertRoom(txn *sql.Tx, roomID string) error { + _, err := txn.Stmt(s.insertRoomStmt).Exec(roomID) + return err +} + +// selectRoomForUpdate locks the row for the room and returns the last_event_id. +// The row must already exist in the table. Callers can ensure that the row +// exists by calling insertRoom first. +func (s *roomStatements) selectRoomForUpdate(txn *sql.Tx, roomID string) (string, error) { + var lastEventID string + err := txn.Stmt(s.selectRoomForUpdateStmt).QueryRow(roomID).Scan(&lastEventID) + if err != nil { + return "", err + } + return lastEventID, nil +} + +// updateRoom updates the last_event_id for the room. selectRoomForUpdate should +// have already been called earlier within the transaction. +func (s *roomStatements) updateRoom(txn *sql.Tx, roomID, lastEventID string) error { + _, err := txn.Stmt(s.updateRoomStmt).Exec(roomID, lastEventID) + return err +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go new file mode 100644 index 00000000..2f98093e --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -0,0 +1,126 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/federationsender/types" +) + +// Database stores information needed by the federation sender +type Database struct { + joinedHostsStatements + roomStatements + common.PartitionOffsetStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + var err error + + if err = d.joinedHostsStatements.prepare(d.db); err != nil { + return err + } + + if err = d.roomStatements.prepare(d.db); err != nil { + return err + } + + if err = d.PartitionOffsetStatements.Prepare(d.db); err != nil { + return err + } + + return nil +} + +// PartitionOffsets implements common.PartitionStorer +func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.UpsertPartitionOffset(topic, partition, offset) +} + +// UpdateRoom updates the joined hosts for a room and returns what the joined +// hosts were before the update. +func (d *Database) UpdateRoom( + roomID, oldEventID, newEventID string, + addHosts []types.JoinedHost, + removeHosts []string, +) (joinedHosts []types.JoinedHost, err error) { + err = runTransaction(d.db, func(txn *sql.Tx) error { + if err = d.insertRoom(txn, roomID); err != nil { + return err + } + lastSentEventID, err := d.selectRoomForUpdate(txn, roomID) + if err != nil { + return err + } + if lastSentEventID != oldEventID { + return types.EventIDMismatchError{lastSentEventID, oldEventID} + } + joinedHosts, err = d.selectJoinedHosts(txn, roomID) + if err != nil { + return err + } + for _, add := range addHosts { + err = d.insertJoinedHosts(txn, roomID, add.MemberEventID, add.ServerName) + if err != nil { + return err + } + } + if err = d.deleteJoinedHosts(txn, removeHosts); err != nil { + return err + } + return d.updateRoom(txn, roomID, newEventID) + }) + return +} + +func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { + txn, err := db.Begin() + if err != nil { + return + } + defer func() { + if r := recover(); r != nil { + txn.Rollback() + panic(r) + } else if err != nil { + txn.Rollback() + } else { + err = txn.Commit() + } + }() + err = fn(txn) + return +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/types/types.go b/src/github.com/matrix-org/dendrite/federationsender/types/types.go new file mode 100644 index 00000000..05ba92f7 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/types/types.go @@ -0,0 +1,45 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "fmt" + + "github.com/matrix-org/gomatrixserverlib" +) + +// A JoinedHost is a server that is joined to a matrix room. +type JoinedHost struct { + // The MemberEventID of a m.room.member join event. + MemberEventID string + // The domain part of the state key of the m.room.member join event + ServerName gomatrixserverlib.ServerName +} + +// A EventIDMismatchError indicates that we have got out of sync with the +// room server. +type EventIDMismatchError struct { + // The event ID we have stored in our local database. + DatabaseID string + // The event ID received from the room server. + RoomServerID string +} + +func (e EventIDMismatchError) Error() string { + return fmt.Sprintf( + "mismatched last sent event ID: had %q in database got %q from room server", + e.DatabaseID, e.RoomServerID, + ) +}