diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index 3b46487a..34455ddb 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -15,35 +15,24 @@ package producers import ( - "encoding/json" - "fmt" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" - sarama "gopkg.in/Shopify/sarama.v1" ) // RoomserverProducer produces events for the roomserver to consume. type RoomserverProducer struct { - Topic string - Producer sarama.SyncProducer + InputAPI api.RoomserverInputAPI } // NewRoomserverProducer creates a new RoomserverProducer -func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProducer, error) { - producer, err := sarama.NewSyncProducer(kafkaURIs, nil) - if err != nil { - return nil, err - } +func NewRoomserverProducer(roomserverURI string) *RoomserverProducer { return &RoomserverProducer{ - Topic: topic, - Producer: producer, - }, nil + InputAPI: api.NewRoomserverInputAPIHTTP(roomserverURI, nil), + } } // SendEvents writes the given events to the roomserver input log. The events are written with KindNew. func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAsServer gomatrixserverlib.ServerName) error { - eventIDs := make([]string, len(events)) ires := make([]api.InputRoomEvent, len(events)) for i, event := range events { ires[i] = api.InputRoomEvent{ @@ -52,9 +41,8 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), } - eventIDs[i] = event.EventID() } - return c.SendInputRoomEvents(ires, eventIDs) + return c.SendInputRoomEvents(ires) } // SendEventWithState writes an event with KindNew to the roomserver input log @@ -65,7 +53,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat return err } - eventIDs := make([]string, len(outliers)+1) ires := make([]api.InputRoomEvent, len(outliers)+1) for i, outlier := range outliers { ires[i] = api.InputRoomEvent{ @@ -73,7 +60,6 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat Event: outlier, AuthEventIDs: outlier.AuthEventIDs(), } - eventIDs[i] = outlier.EventID() } stateEventIDs := make([]string, len(state.StateEvents)) @@ -88,41 +74,14 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat HasState: true, StateEventIDs: stateEventIDs, } - eventIDs[len(outliers)] = event.EventID() - return c.SendInputRoomEvents(ires, eventIDs) + return c.SendInputRoomEvents(ires) } // SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both // arrays must match, and each element must correspond to the same event. -func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error { - // TODO: Nicer way of doing this. Options are: - // A) Like this - // B) Add EventID field to InputRoomEvent - // C) Add wrapper struct with the EventID and the InputRoomEvent - if len(eventIDs) != len(ires) { - return fmt.Errorf("WriteInputRoomEvents: length mismatch %d != %d", len(eventIDs), len(ires)) - } - - msgs := make([]*sarama.ProducerMessage, len(ires)) - for i := range ires { - msg, err := c.toProducerMessage(ires[i], eventIDs[i]) - if err != nil { - return err - } - msgs[i] = msg - } - return c.Producer.SendMessages(msgs) -} - -func (c *RoomserverProducer) toProducerMessage(ire api.InputRoomEvent, eventID string) (*sarama.ProducerMessage, error) { - value, err := json.Marshal(ire) - if err != nil { - return nil, err - } - var m sarama.ProducerMessage - m.Topic = c.Topic - m.Key = sarama.StringEncoder(eventID) - m.Value = sarama.ByteEncoder(value) - return &m, nil +func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent) error { + request := api.InputRoomEventsRequest{InputRoomEvents: ires} + var response api.InputRoomEventsResponse + return c.InputAPI.InputRoomEvents(&request, &response) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 6699897f..eb1218e7 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -51,9 +51,9 @@ func main() { log.Info("config: ", cfg) - roomserverProducer, err := producers.NewRoomserverProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), - ) + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) + + roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL()) userUpdateProducer, err := producers.NewUserUpdateProducer( cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), ) @@ -65,7 +65,6 @@ func main() { cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, ) - queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName) if err != nil { log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error()) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go index f4f19cdc..a479ad55 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go @@ -67,9 +67,7 @@ func main() { queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) - roomserverProducer, err := producers.NewRoomserverProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), - ) + roomserverProducer := producers.NewRoomserverProducer(cfg.RoomServerURL()) if err != nil { log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go index 0a168679..715a4074 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go @@ -16,11 +16,9 @@ package main import ( "flag" - "fmt" "net/http" _ "net/http/pprof" "os" - "strconv" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" @@ -33,9 +31,8 @@ import ( ) var ( - logDir = os.Getenv("LOG_DIR") - configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") - stopProcessingAfter = os.Getenv("STOP_AFTER") + logDir = os.Getenv("LOG_DIR") + configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") ) func main() { @@ -56,49 +53,25 @@ func main() { panic(err) } - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - panic(err) - } - kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) if err != nil { panic(err) } - consumer := input.Consumer{ - ContinualConsumer: common.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.InputRoomEvent), - Consumer: kafkaConsumer, - PartitionStore: db, - }, - DB: db, - Producer: kafkaProducer, - OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent), - } - - if stopProcessingAfter != "" { - count, err := strconv.ParseInt(stopProcessingAfter, 10, 64) - if err != nil { - panic(err) - } - consumer.StopProcessingAfter = &count - consumer.ShutdownCallback = func(message string) { - fmt.Println("Stopping roomserver", message) - os.Exit(0) - } - } - - if err = consumer.Start(); err != nil { - panic(err) - } - queryAPI := query.RoomserverQueryAPI{ DB: db, } queryAPI.SetupHTTP(http.DefaultServeMux) + inputAPI := input.RoomserverInputAPI{ + DB: db, + Producer: kafkaProducer, + OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent), + } + + inputAPI.SetupHTTP(http.DefaultServeMux) + http.DefaultServeMux.Handle("/metrics", prometheus.Handler()) log.Info("Started room server on ", cfg.Listen.RoomServer) diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go index c4bea7f3..43305c2f 100644 --- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go @@ -23,6 +23,10 @@ import ( "strings" "time" + "encoding/json" + + "net/http" + "github.com/matrix-org/dendrite/common/test" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -90,7 +94,7 @@ func createDatabase(database string) error { // messages is reached or after a timeout. It kills the command before it returns. // It returns a list of the messages read from the command on success or an error // on failure. -func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAPI func()) ([]string, error) { +func runAndReadFromTopic(runCmd *exec.Cmd, readyURL string, doInput func(), topic string, count int, checkQueryAPI func()) ([]string, error) { type result struct { // data holds all of stdout on success. data []byte @@ -107,6 +111,11 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP ) // Send stderr to our stderr so the user can see any error messages. readCmd.Stderr = os.Stderr + + // Kill both processes before we exit. + defer func() { runCmd.Process.Kill() }() + defer func() { readCmd.Process.Kill() }() + // Run the command, read the messages and wait for a timeout in parallel. go func() { // Read all of stdout. @@ -131,14 +140,40 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP time.Sleep(timeout) done <- result{nil, fmt.Errorf("Timeout reading %d messages from topic %q", count, topic)} }() + + // Poll the HTTP listener of the process waiting for it to be ready to receive requests. + ready := make(chan struct{}) + go func() { + delay := 10 * time.Millisecond + for { + time.Sleep(delay) + if delay < 100*time.Millisecond { + delay *= 2 + } + resp, err := http.Get(readyURL) + if err != nil { + continue + } + if resp.StatusCode == 200 { + break + } + } + ready <- struct{}{} + }() + + // Wait for the roomserver to be ready to receive input or for it to crash. + select { + case <-ready: + case r := <-done: + return nil, r.err + } + + // Write the input now that the server is running. + doInput() + // Wait for one of the tasks to finsh. r := <-done - // Kill both processes. We don't check if the processes are running and - // we ignore failures since we are just trying to clean up before returning. - runCmd.Process.Kill() - readCmd.Process.Kill() - if r.err != nil { return nil, r.err } @@ -153,6 +188,20 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP return lines, nil } +func writeToRoomServer(input []string, roomserverURL string) error { + var request api.InputRoomEventsRequest + var response api.InputRoomEventsResponse + var err error + request.InputRoomEvents = make([]api.InputRoomEvent, len(input)) + for i := range input { + if err = json.Unmarshal([]byte(input[i]), &request.InputRoomEvents[i]); err != nil { + return err + } + } + x := api.NewRoomserverInputAPIHTTP(roomserverURL, nil) + return x.InputRoomEvents(&request, &response) +} + // testRoomserver is used to run integration tests against a single roomserver. // It creates new kafka topics for the input and output of the roomserver. // It writes the input messages to the input kafka topic, formatting each message @@ -176,24 +225,22 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R panic(err) } - inputTopic := string(cfg.Kafka.Topics.InputRoomEvent) outputTopic := string(cfg.Kafka.Topics.OutputRoomEvent) - exe.DeleteTopic(inputTopic) - if err := exe.CreateTopic(inputTopic); err != nil { - panic(err) - } exe.DeleteTopic(outputTopic) if err := exe.CreateTopic(outputTopic); err != nil { panic(err) } - if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(input)); err != nil { + if err = createDatabase(testDatabaseName); err != nil { panic(err) } - if err = createDatabase(testDatabaseName); err != nil { - panic(err) + doInput := func() { + fmt.Printf("Roomserver is ready to receive input, sending %d events\n", len(input)) + if err = writeToRoomServer(input, cfg.RoomServerURL()); err != nil { + panic(err) + } } cmd := exec.Command(filepath.Join(filepath.Dir(os.Args[0]), "dendrite-room-server")) @@ -205,7 +252,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R cmd.Stderr = os.Stderr cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)} - gotOutput, err := runAndReadFromTopic(cmd, outputTopic, len(wantOutput), func() { + gotOutput, err := runAndReadFromTopic(cmd, cfg.RoomServerURL()+"/metrics", doInput, outputTopic, len(wantOutput), func() { queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil) checkQueries(queryAPI) }) diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 311312a9..4b362b5f 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -19,13 +19,14 @@ import ( "crypto/sha256" "encoding/pem" "fmt" - "github.com/matrix-org/gomatrixserverlib" - "golang.org/x/crypto/ed25519" - "gopkg.in/yaml.v2" "io/ioutil" "path/filepath" "strings" "time" + + "github.com/matrix-org/gomatrixserverlib" + "golang.org/x/crypto/ed25519" + "gopkg.in/yaml.v2" ) // Version is the current version of the config format. @@ -95,8 +96,6 @@ type Dendrite struct { Addresses []string `yaml:"addresses"` // The names of the topics to use when reading and writing from kafka. Topics struct { - // Topic for roomserver/api.InputRoomEvent events. - InputRoomEvent Topic `yaml:"input_room_event"` // Topic for roomserver/api.OutputRoomEvent events. OutputRoomEvent Topic `yaml:"output_room_event"` // Topic for user updates (profile, presence) @@ -298,7 +297,6 @@ func (config *Dendrite) check() error { } checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) - checkNotEmpty("kafka.topics.input_room_event", string(config.Kafka.Topics.InputRoomEvent)) checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.device", string(config.Database.Device)) diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index 8c957e77..e429d06b 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -21,14 +21,15 @@ import ( "encoding/base64" "encoding/pem" "fmt" - "github.com/matrix-org/dendrite/common/config" - "github.com/matrix-org/gomatrixserverlib" - "gopkg.in/yaml.v2" "io/ioutil" "math/big" "os" "path/filepath" "time" + + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/gomatrixserverlib" + "gopkg.in/yaml.v2" ) const ( @@ -80,7 +81,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.Kafka.Addresses = []string{kafkaURI} // TODO: Different servers should be using different topics. // Make this configurable somehow? - cfg.Kafka.Topics.InputRoomEvent = "test.room.input" cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" // TODO: Use different databases for the different schemas. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go deleted file mode 100644 index efe45038..00000000 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package input contains the code that writes -package input - -import ( - "encoding/json" - "fmt" - "sync/atomic" - - "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/roomserver/api" - sarama "gopkg.in/Shopify/sarama.v1" -) - -// A ConsumerDatabase has the storage APIs needed by the consumer. -type ConsumerDatabase interface { - RoomEventDatabase - common.PartitionStorer -} - -// An ErrorLogger handles the errors encountered by the consumer. -type ErrorLogger interface { - OnError(message *sarama.ConsumerMessage, err error) -} - -// A Consumer consumes a kafkaesque stream of room events. -// The room events are supplied as api.InputRoomEvent structs serialised as JSON. -// The events should be valid matrix events. -// The events needed to authenticate the event should already be stored on the roomserver. -// The events needed to construct the state at the event should already be stored on the roomserver. -// If the event is not valid then it will be discarded and an error will be logged. -type Consumer struct { - ContinualConsumer common.ContinualConsumer - // The database used to store the room events. - DB ConsumerDatabase - Producer sarama.SyncProducer - // The kafkaesque topic to output new room events to. - // This is the name used in kafka to identify the stream to write events to. - OutputRoomEventTopic string - // The ErrorLogger for this consumer. - // If left as nil then the consumer will panic when it encounters an error - ErrorLogger ErrorLogger - // If non-nil then the consumer will stop processing messages after this - // many messages and will shutdown. Malformed messages are included in the count. - StopProcessingAfter *int64 - // If not-nil then the consumer will call this to shutdown the server. - ShutdownCallback func(reason string) - // How many messages the consumer has processed. - processed int64 -} - -// WriteOutputRoomEvent implements OutputRoomEventWriter -func (c *Consumer) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { - var m sarama.ProducerMessage - oe := api.OutputEvent{ - Type: api.OutputTypeNewRoomEvent, - NewRoomEvent: &output, - } - value, err := json.Marshal(oe) - if err != nil { - return err - } - m.Topic = c.OutputRoomEventTopic - m.Key = sarama.StringEncoder("") - m.Value = sarama.ByteEncoder(value) - _, _, err = c.Producer.SendMessage(&m) - return err -} - -// Start starts the consumer consuming. -// Starts up a goroutine for each partition in the kafka stream. -// Returns nil once all the goroutines are started. -// Returns an error if it can't start consuming for any of the partitions. -func (c *Consumer) Start() error { - c.ContinualConsumer.ProcessMessage = c.processMessage - c.ContinualConsumer.ShutdownCallback = c.shutdown - return c.ContinualConsumer.Start() -} - -func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error { - var input api.InputRoomEvent - if err := json.Unmarshal(message.Value, &input); err != nil { - // If the message is invalid then log it and move onto the next message in the stream. - c.logError(message, err) - } else { - if err := processRoomEvent(c.DB, c, input); err != nil { - // If there was an error processing the message then log it and - // move onto the next message in the stream. - // TODO: If the error was due to a problem talking to the database - // then we shouldn't move onto the next message and we should either - // retry processing the message, or panic and kill ourselves. - c.logError(message, err) - } - } - // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. - processed := atomic.AddInt64(&c.processed, 1) - // Check if we should stop processing. - // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. - // If we try to stop processing after M message and we have N goroutines then we will process somewhere - // between M and (N + M) messages because the N goroutines could all try to process what they think will be the - // last message. We could be more careful here but this is good enough for getting rough benchmarks. - if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) { - return common.ErrShutdown - } - return nil -} - -func (c *Consumer) shutdown() { - if c.ShutdownCallback != nil { - c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed)) - } -} - -// logError is a convenience method for logging errors. -func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) { - if c.ErrorLogger == nil { - panic(err) - } - c.ErrorLogger.OnError(message, err) -} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index fcc5eb88..fcb414f0 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -16,12 +16,9 @@ package storage import ( "database/sql" - - "github.com/matrix-org/dendrite/common" ) type statements struct { - common.PartitionOffsetStatements eventTypeStatements eventStateKeyStatements roomStatements @@ -35,10 +32,6 @@ type statements struct { func (s *statements) prepare(db *sql.DB) error { var err error - if err = s.PartitionOffsetStatements.Prepare(db); err != nil { - return err - } - if err = s.eventTypeStatements.prepare(db); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 1dfc89d4..50e2f44d 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -18,7 +18,6 @@ import ( "database/sql" // Import the postgres database driver. _ "github.com/lib/pq" - "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -42,16 +41,6 @@ func Open(dataSourceName string) (*Database, error) { return &d, nil } -// PartitionOffsets implements input.ConsumerDatabase -func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { - return d.statements.SelectPartitionOffsets(topic) -} - -// SetPartitionOffset implements input.ConsumerDatabase -func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { - return d.statements.UpsertPartitionOffset(topic, partition, offset) -} - // StoreEvent implements input.EventDatabase func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error) { var (