diff --git a/vendor/manifest b/vendor/manifest index 425cc8f1..99bb98da 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -101,6 +101,12 @@ "revision": "768a8767051a4aca7f5e41f912954ae04d5f1efb", "branch": "master" }, + { + "importpath": "github.com/matrix-org/naffka", + "repository": "https://github.com/matrix-org/naffka", + "revision": "d28656e34f96a8eeaab53e3b7678c9ce14af5786", + "branch": "master" + }, { "importpath": "github.com/matrix-org/util", "repository": "https://github.com/matrix-org/util", diff --git a/vendor/src/github.com/matrix-org/naffka/README.md b/vendor/src/github.com/matrix-org/naffka/README.md new file mode 100644 index 00000000..8a7dd259 --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/README.md @@ -0,0 +1,5 @@ +# naffka + +Single in-process implementation of the [sarama golang kafka](https://github.com/Shopify/sarama) APIs. + +It's like Kafka, but a bit [naff](https://www.collinsdictionary.com/dictionary/english/naff). diff --git a/vendor/src/github.com/matrix-org/naffka/hooks/install.sh b/vendor/src/github.com/matrix-org/naffka/hooks/install.sh new file mode 100644 index 00000000..f8aa331f --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/hooks/install.sh @@ -0,0 +1,5 @@ +#! /bin/bash + +DOT_GIT="$(dirname $0)/../.git" + +ln -s "../../hooks/pre-commit" "$DOT_GIT/hooks/pre-commit" \ No newline at end of file diff --git a/vendor/src/github.com/matrix-org/naffka/hooks/pre-commit b/vendor/src/github.com/matrix-org/naffka/hooks/pre-commit new file mode 100644 index 00000000..a7ec4d01 --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/hooks/pre-commit @@ -0,0 +1,24 @@ +#! /bin/bash + +set -eu + +golint ./... +misspell -error . + +# gofmt doesn't exit with an error code if the files don't match the expected +# format. So we have to run it and see if it outputs anything. +if gofmt -l -s . 2>&1 | read +then + echo "Error: not all code had been formatted with gofmt." + echo "Fixing the following files" + gofmt -s -w -l . + echo + echo "Please add them to the commit" + git status --short + exit 1 +fi + +ineffassign . +go tool vet --all --shadow . +gocyclo -over 16 . +go test -timeout 5s . ./... diff --git a/vendor/src/github.com/matrix-org/naffka/memorydatabase.go b/vendor/src/github.com/matrix-org/naffka/memorydatabase.go new file mode 100644 index 00000000..05d1f3ee --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/memorydatabase.go @@ -0,0 +1,91 @@ +package naffka + +import ( + "fmt" + "sync" +) + +// A MemoryDatabase stores the message history as arrays in memory. +// It can be used to run unit tests. +// If the process is stopped then any messages that haven't been +// processed by a consumer are lost forever. +type MemoryDatabase struct { + topicsMutex sync.Mutex + topics map[string]*memoryDatabaseTopic +} + +type memoryDatabaseTopic struct { + messagesMutex sync.Mutex + messages []Message +} + +func (t *memoryDatabaseTopic) addMessages(msgs []Message) error { + t.messagesMutex.Lock() + defer t.messagesMutex.Unlock() + if int64(len(t.messages)) != msgs[0].Offset { + return fmt.Errorf("message offset %d is not immediately after the previous offset %d", msgs[0].Offset, len(t.messages)) + } + t.messages = append(t.messages, msgs...) + return nil +} + +// getMessages returns the current messages as a slice. +// This slice will have it's own copy of the length field so won't be affected +// by adding more messages in addMessages. +// The slice will share the same backing array with the slice we append new +// messages to. It is safe to read the messages in the backing array since we +// only append to the slice. It is not safe to write or append to the returned +// slice. +func (t *memoryDatabaseTopic) getMessages() []Message { + t.messagesMutex.Lock() + defer t.messagesMutex.Unlock() + return t.messages +} + +func (m *MemoryDatabase) getTopic(topicName string) *memoryDatabaseTopic { + m.topicsMutex.Lock() + defer m.topicsMutex.Unlock() + result := m.topics[topicName] + if result == nil { + result = &memoryDatabaseTopic{} + if m.topics == nil { + m.topics = map[string]*memoryDatabaseTopic{} + } + m.topics[topicName] = result + } + return result +} + +// StoreMessages implements Database +func (m *MemoryDatabase) StoreMessages(topic string, messages []Message) error { + if err := m.getTopic(topic).addMessages(messages); err != nil { + return err + } + return nil +} + +// FetchMessages implements Database +func (m *MemoryDatabase) FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error) { + messages := m.getTopic(topic).getMessages() + if endOffset > int64(len(messages)) { + return nil, fmt.Errorf("end offset %d out of range %d", endOffset, len(messages)) + } + if startOffset >= endOffset { + return nil, fmt.Errorf("start offset %d greater than or equal to end offset %d", startOffset, endOffset) + } + if startOffset < -1 { + return nil, fmt.Errorf("start offset %d less than -1", startOffset) + } + return messages[startOffset+1 : endOffset], nil +} + +// MaxOffsets implements Database +func (m *MemoryDatabase) MaxOffsets() (map[string]int64, error) { + m.topicsMutex.Lock() + defer m.topicsMutex.Unlock() + result := map[string]int64{} + for name, t := range m.topics { + result[name] = int64(len(t.getMessages())) - 1 + } + return result, nil +} diff --git a/vendor/src/github.com/matrix-org/naffka/naffka.go b/vendor/src/github.com/matrix-org/naffka/naffka.go new file mode 100644 index 00000000..d429ffda --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/naffka.go @@ -0,0 +1,360 @@ +package naffka + +import ( + "fmt" + "log" + "sync" + "time" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// Naffka is an implementation of the sarama kafka API designed to run within a +// single go process. It implements both the sarama.SyncProducer and the +// sarama.Consumer interfaces. This means it can act as a drop in replacement +// for kafka for testing or single instance deployment. +type Naffka struct { + db Database + topicsMutex sync.Mutex + topics map[string]*topic +} + +// New creates a new Naffka instance. +func New(db Database) (*Naffka, error) { + n := &Naffka{db: db, topics: map[string]*topic{}} + maxOffsets, err := db.MaxOffsets() + if err != nil { + return nil, err + } + for topicName, offset := range maxOffsets { + n.topics[topicName] = &topic{ + topicName: topicName, + nextOffset: offset + 1, + } + } + return n, nil +} + +// A Message is used internally within naffka to store messages. +// It is converted to a sarama.ConsumerMessage when exposed to the +// public APIs to maintain API compatibility with sarama. +type Message struct { + Offset int64 + Key []byte + Value []byte + Timestamp time.Time +} + +func (m *Message) consumerMessage(topic string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Topic: topic, + Offset: m.Offset, + Key: m.Key, + Value: m.Value, + Timestamp: m.Timestamp, + } +} + +// A Database is used to store naffka messages. +// Messages are stored so that new consumers can access the full message history. +type Database interface { + // StoreMessages stores a list of messages. + // Every message offset must be unique within each topic. + // Messages must be stored monotonically and contiguously for each topic. + // So for a given topic the message with offset n+1 is stored after the + // the message with offset n. + StoreMessages(topic string, messages []Message) error + // FetchMessages fetches all messages with an offset greater than but not + // including startOffset and less than but not including endOffset. + // The range of offsets requested must not overlap with those stored by a + // concurrent StoreMessages. The message offsets within the requested range + // are contigous. That is FetchMessage("foo", n, m) will only be called + // once the messages between n and m have been stored by StoreMessages. + // Every call must return at least one message. That is there must be at + // least one message between the start and offset. + FetchMessages(topic string, startOffset, endOffset int64) ([]Message, error) + // MaxOffsets returns the maximum offset for each topic. + MaxOffsets() (map[string]int64, error) +} + +// SendMessage implements sarama.SyncProducer +func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + err = n.SendMessages([]*sarama.ProducerMessage{msg}) + return msg.Partition, msg.Offset, err +} + +// SendMessages implements sarama.SyncProducer +func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error { + byTopic := map[string][]*sarama.ProducerMessage{} + for _, msg := range msgs { + byTopic[msg.Topic] = append(byTopic[msg.Topic], msg) + } + var topicNames []string + for topicName := range byTopic { + topicNames = append(topicNames, topicName) + } + + now := time.Now() + topics := n.getTopics(topicNames) + for topicName := range byTopic { + if err := topics[topicName].send(now, byTopic[topicName]); err != nil { + return err + } + } + return nil +} + +func (n *Naffka) getTopics(topicNames []string) map[string]*topic { + n.topicsMutex.Lock() + defer n.topicsMutex.Unlock() + result := map[string]*topic{} + for _, topicName := range topicNames { + t := n.topics[topicName] + if t == nil { + // If the topic doesn't already exist then create it. + t = &topic{db: n.db, topicName: topicName} + n.topics[topicName] = t + } + result[topicName] = t + } + return result +} + +// Topics implements sarama.Consumer +func (n *Naffka) Topics() ([]string, error) { + n.topicsMutex.Lock() + defer n.topicsMutex.Unlock() + var result []string + for topic := range n.topics { + result = append(result, topic) + } + return result, nil +} + +// Partitions implements sarama.Consumer +func (n *Naffka) Partitions(topic string) ([]int32, error) { + // Naffka stores a single partition per topic, so this always returns a single partition ID. + return []int32{0}, nil +} + +// ConsumePartition implements sarama.Consumer +func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { + if partition != 0 { + return nil, fmt.Errorf("Unknown partition ID %d", partition) + } + topics := n.getTopics([]string{topic}) + return topics[topic].consume(offset), nil +} + +// HighWaterMarks implements sarama.Consumer +func (n *Naffka) HighWaterMarks() map[string]map[int32]int64 { + n.topicsMutex.Lock() + defer n.topicsMutex.Unlock() + result := map[string]map[int32]int64{} + for topicName, topic := range n.topics { + result[topicName] = map[int32]int64{ + 0: topic.highwaterMark(), + } + } + return result +} + +// Close implements sarama.SyncProducer and sarama.Consumer +func (n *Naffka) Close() error { + return nil +} + +const channelSize = 1024 + +type partitionConsumer struct { + topic *topic + messages chan *sarama.ConsumerMessage + // Whether the consumer is ready for new messages or whether it + // is catching up on historic messages. + // Reads and writes to this field are proctected by the topic mutex. + ready bool +} + +// AsyncClose implements sarama.PartitionConsumer +func (c *partitionConsumer) AsyncClose() { +} + +// Close implements sarama.PartitionConsumer +func (c *partitionConsumer) Close() error { + // TODO: Add support for performing a clean shutdown of the consumer. + return nil +} + +// Messages implements sarama.PartitionConsumer +func (c *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return c.messages +} + +// Errors implements sarama.PartitionConsumer +func (c *partitionConsumer) Errors() <-chan *sarama.ConsumerError { + // TODO: Add option to pass consumer errors to an errors channel. + return nil +} + +// HighWaterMarkOffset implements sarama.PartitionConsumer +func (c *partitionConsumer) HighWaterMarkOffset() int64 { + return c.topic.highwaterMark() +} + +// block writes the message to the consumer blocking until the consumer is ready +// to add the message to the channel. Once the message is successfully added to +// the channel it will catch up by pulling historic messsages from the database. +func (c *partitionConsumer) block(cmsg *sarama.ConsumerMessage) { + c.messages <- cmsg + c.catchup(cmsg.Offset) +} + +// catchup reads historic messages from the database until the consumer has caught +// up on all the historic messages. +func (c *partitionConsumer) catchup(fromOffset int64) { + for { + // First check if we have caught up. + caughtUp, nextOffset := c.topic.hasCaughtUp(c, fromOffset) + if caughtUp { + return + } + // Limit the number of messages we request from the database to be the + // capacity of the channel. + if nextOffset > fromOffset+int64(cap(c.messages)) { + nextOffset = fromOffset + int64(cap(c.messages)) + } + // Fetch the messages from the database. + msgs, err := c.topic.db.FetchMessages(c.topic.topicName, fromOffset, nextOffset) + if err != nil { + // TODO: Add option to write consumer errors to an errors channel + // as an alternative to logging the errors. + log.Print("Error reading messages: ", err) + // Wait before retrying. + // TODO: Maybe use an exponentional backoff scheme here. + // TODO: This timeout should take account of all the other goroutines + // that might be doing the same thing. (If there are a 10000 consumers + // then we don't want to end up retrying every millisecond) + time.Sleep(10 * time.Second) + continue + } + if len(msgs) == 0 { + // This should only happen if the database is corrupted and has lost the + // messages between the requested offsets. + log.Fatalf("Corrupt database returned no messages between %d and %d", fromOffset, nextOffset) + } + + // Pass the messages into the consumer channel. + // Blocking each write until the channel has enough space for the message. + for i := range msgs { + c.messages <- msgs[i].consumerMessage(c.topic.topicName) + } + // Update our the offset for the next loop iteration. + fromOffset = msgs[len(msgs)-1].Offset + } +} + +type topic struct { + db Database + topicName string + mutex sync.Mutex + consumers []*partitionConsumer + nextOffset int64 +} + +func (t *topic) send(now time.Time, pmsgs []*sarama.ProducerMessage) error { + var err error + // Encode the message keys and values. + msgs := make([]Message, len(pmsgs)) + for i := range msgs { + if pmsgs[i].Key != nil { + msgs[i].Key, err = pmsgs[i].Key.Encode() + if err != nil { + return err + } + } + if pmsgs[i].Value != nil { + msgs[i].Value, err = pmsgs[i].Value.Encode() + if err != nil { + return err + } + } + pmsgs[i].Timestamp = now + msgs[i].Timestamp = now + } + // Take the lock before assigning the offsets. + t.mutex.Lock() + defer t.mutex.Unlock() + offset := t.nextOffset + for i := range msgs { + pmsgs[i].Offset = offset + msgs[i].Offset = offset + offset++ + } + // Store the messages while we hold the lock. + err = t.db.StoreMessages(t.topicName, msgs) + if err != nil { + return err + } + t.nextOffset = offset + + // Now notify the consumers about the messages. + for i := range msgs { + cmsg := msgs[i].consumerMessage(t.topicName) + for _, c := range t.consumers { + if c.ready { + select { + case c.messages <- cmsg: + default: + // The consumer wasn't ready to receive a message because + // the channel buffer was full. + // Fork a goroutine to send the message so that we don't + // block sending messages to the other consumers. + c.ready = false + go c.block(cmsg) + } + } + } + } + + return nil +} + +func (t *topic) consume(offset int64) *partitionConsumer { + t.mutex.Lock() + defer t.mutex.Unlock() + c := &partitionConsumer{ + topic: t, + } + // Handle special offsets. + if offset == sarama.OffsetNewest { + offset = t.nextOffset + } + if offset == sarama.OffsetOldest { + offset = -1 + } + c.messages = make(chan *sarama.ConsumerMessage, channelSize) + t.consumers = append(t.consumers, c) + // Start catching up on historic messages in the background. + go c.catchup(offset) + return c +} + +func (t *topic) hasCaughtUp(c *partitionConsumer, offset int64) (bool, int64) { + t.mutex.Lock() + defer t.mutex.Unlock() + // Check if we have caught up while holding a lock on the topic so there + // isn't a way for our check to race with a new message being sent on the topic. + if offset+1 == t.nextOffset { + // We've caught up, the consumer can now receive messages as they are + // sent rather than fetching them from the database. + c.ready = true + return true, t.nextOffset + } + return false, t.nextOffset +} + +func (t *topic) highwaterMark() int64 { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.nextOffset +} diff --git a/vendor/src/github.com/matrix-org/naffka/naffka_test.go b/vendor/src/github.com/matrix-org/naffka/naffka_test.go new file mode 100644 index 00000000..d1a26710 --- /dev/null +++ b/vendor/src/github.com/matrix-org/naffka/naffka_test.go @@ -0,0 +1,86 @@ +package naffka + +import ( + "testing" + "time" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +func TestSendAndReceive(t *testing.T) { + naffka, err := New(&MemoryDatabase{}) + if err != nil { + t.Fatal(err) + } + producer := sarama.SyncProducer(naffka) + consumer := sarama.Consumer(naffka) + const topic = "testTopic" + const value = "Hello, World" + + c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest) + if err != nil { + t.Fatal(err) + } + + message := sarama.ProducerMessage{ + Value: sarama.StringEncoder(value), + Topic: topic, + } + + if _, _, err = producer.SendMessage(&message); err != nil { + t.Fatal(err) + } + + var result *sarama.ConsumerMessage + select { + case result = <-c.Messages(): + case _ = <-time.NewTimer(10 * time.Second).C: + t.Fatal("expected to receive a message") + } + + if string(result.Value) != value { + t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value)) + } + + select { + case result = <-c.Messages(): + t.Fatal("expected to only receive one message") + default: + } +} + +func TestDelayedReceive(t *testing.T) { + naffka, err := New(&MemoryDatabase{}) + if err != nil { + t.Fatal(err) + } + producer := sarama.SyncProducer(naffka) + consumer := sarama.Consumer(naffka) + const topic = "testTopic" + const value = "Hello, World" + + message := sarama.ProducerMessage{ + Value: sarama.StringEncoder(value), + Topic: topic, + } + + if _, _, err = producer.SendMessage(&message); err != nil { + t.Fatal(err) + } + + c, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest) + if err != nil { + t.Fatal(err) + } + + var result *sarama.ConsumerMessage + select { + case result = <-c.Messages(): + case _ = <-time.NewTimer(10 * time.Second).C: + t.Fatal("expected to receive a message") + } + + if string(result.Value) != value { + t.Fatalf("wrong value: wanted %q got %q", value, string(result.Value)) + } +}