Remove StopProcessingAfter from the roomserver consumer as it is unused (#186)

main
Mark Haines 2017-08-18 11:33:10 +01:00 committed by GitHub
parent ba8b5d8bf9
commit 877ea5cb62
1 changed files with 0 additions and 24 deletions

View File

@ -17,9 +17,6 @@ package input
import ( import (
"encoding/json" "encoding/json"
"fmt"
"sync/atomic"
"net/http" "net/http"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -35,15 +32,6 @@ type RoomserverInputAPI struct {
// The kafkaesque topic to output new room events to. // The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to. // This is the name used in kafka to identify the stream to write events to.
OutputRoomEventTopic string OutputRoomEventTopic string
// If non-nil then the API will stop processing messages after this
// many messages and will shutdown. Malformed messages are not in the count.
StopProcessingAfter *int64
// If not-nil then the API will call this to shutdown the server.
// If this is nil then the API will continue to process messsages even
// though StopProcessingAfter has been reached.
ShutdownCallback func(reason string)
// How many messages the consumer has processed.
processed int64
} }
// WriteOutputEvents implements OutputRoomEventWriter // WriteOutputEvents implements OutputRoomEventWriter
@ -72,18 +60,6 @@ func (r *RoomserverInputAPI) InputRoomEvents(
if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil { if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil {
return err return err
} }
// Update the number of processed messages using atomic addition because it is accessed from multiple goroutines.
processed := atomic.AddInt64(&r.processed, 1)
// Check if we should stop processing.
// Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages.
// If we try to stop processing after M message and we have N goroutines then we will process somewhere
// between M and (N + M) messages because the N goroutines could all try to process what they think will be the
// last message. We could be more careful here but this is good enough for getting rough benchmarks.
if r.StopProcessingAfter != nil && processed >= int64(*r.StopProcessingAfter) {
if r.ShutdownCallback != nil {
r.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", r.processed))
}
}
} }
return nil return nil
} }