diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index 7f048fca..4f431bee 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -18,3 +18,11 @@ type ClientAPI struct { // The URL of the roomserver which can service Query API requests RoomserverURL string } + +// Sync contains the config information necessary to spin up a sync-server process. +type Sync struct { + // The topic for events which are written by the room server output log. + RoomserverOutputTopic string + // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. + KafkaConsumerURIs []string +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 1a085d8b..f1d2dd5f 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -23,9 +23,6 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { return writers.CreateRoom(req, cfg, producer) }))) - r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { - return readers.Sync(req) - }))) r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", make("send_message", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { vars := mux.Vars(req) @@ -51,6 +48,17 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } +// SetupSyncServer configures the given mux with sync-server listeners +func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { + apiMux := mux.NewRouter() + r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() + r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { + return readers.Sync(req) + }))) + servMux.Handle("/metrics", prometheus.Handler()) + servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) +} + // make a util.JSONRequestHandler into an http.Handler func make(metricsName string, h util.JSONRequestHandler) http.Handler { return prometheus.InstrumentHandler(metricsName, util.MakeJSONAPI(h)) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go new file mode 100644 index 00000000..c92e7096 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "net/http" + "os" + "path/filepath" + + "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/clientapi/routing" + + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dugong" +) + +func setupLogging(logDir string) { + _ = os.Mkdir(logDir, os.ModePerm) + log.AddHook(dugong.NewFSHook( + filepath.Join(logDir, "info.log"), + filepath.Join(logDir, "warn.log"), + filepath.Join(logDir, "error.log"), + &log.TextFormatter{ + TimestampFormat: "2006-01-02 15:04:05.000000", + DisableColors: true, + DisableTimestamp: false, + DisableSorting: false, + }, &dugong.DailyRotationSchedule{GZip: true}, + )) +} + +func main() { + bindAddr := os.Getenv("BIND_ADDRESS") + if bindAddr == "" { + log.Panic("No BIND_ADDRESS environment variable found.") + } + logDir := os.Getenv("LOG_DIR") + if logDir != "" { + setupLogging(logDir) + } + + cfg := config.Sync{ + KafkaConsumerURIs: []string{"localhost:9092"}, + RoomserverOutputTopic: "roomserverOutput", + } + + log.Info("Starting sync server") + + routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg) + log.Fatal(http.ListenAndServe(bindAddr, nil)) +}