diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index c3f18b08..7f048fca 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -18,13 +18,3 @@ type ClientAPI struct { // The URL of the roomserver which can service Query API requests RoomserverURL string } - -// Sync contains the config information necessary to spin up a sync-server process. -type Sync struct { - // The topic for events which are written by the room server output log. - RoomserverOutputTopic string `yaml:"roomserver_topic"` - // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. - KafkaConsumerURIs []string `yaml:"consumer_uris"` - // The postgres connection config for connecting to the database e.g a postgres:// URI - DataSource string `yaml:"database"` -} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index afb1b6a8..29595399 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -6,7 +6,6 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/producers" - "github.com/matrix-org/dendrite/clientapi/sync" "github.com/matrix-org/dendrite/clientapi/writers" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/util" @@ -48,17 +47,6 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } -// SetupSyncServerListeners configures the given mux with sync-server listeners -func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp *sync.RequestPool) { - apiMux := mux.NewRouter() - r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() - r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { - return srp.OnIncomingSyncRequest(req) - }))) - servMux.Handle("/metrics", prometheus.Handler()) - servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) -} - // make a util.JSONRequestHandler into an http.Handler func make(metricsName string, h util.JSONRequestHandler) http.Handler { return prometheus.InstrumentHandler(metricsName, util.MakeJSONAPI(h)) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index 106f8bf5..fe74679a 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -7,10 +7,11 @@ import ( "os" "path/filepath" - "github.com/matrix-org/dendrite/clientapi/config" - "github.com/matrix-org/dendrite/clientapi/routing" - "github.com/matrix-org/dendrite/clientapi/storage" - "github.com/matrix-org/dendrite/clientapi/sync" + "github.com/matrix-org/dendrite/syncserver/config" + "github.com/matrix-org/dendrite/syncserver/consumers" + "github.com/matrix-org/dendrite/syncserver/routing" + "github.com/matrix-org/dendrite/syncserver/storage" + "github.com/matrix-org/dendrite/syncserver/sync" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" @@ -79,7 +80,7 @@ func main() { log.Panicf("startup: Failed to create request pool : %s", err) } - server, err := sync.NewServer(cfg, rp, db) + server, err := consumers.NewServer(cfg, rp, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) } diff --git a/src/github.com/matrix-org/dendrite/common/types.go b/src/github.com/matrix-org/dendrite/common/types.go deleted file mode 100644 index 805d0c79..00000000 --- a/src/github.com/matrix-org/dendrite/common/types.go +++ /dev/null @@ -1 +0,0 @@ -package common diff --git a/src/github.com/matrix-org/dendrite/syncserver/config/config.go b/src/github.com/matrix-org/dendrite/syncserver/config/config.go new file mode 100644 index 00000000..e3c8219b --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncserver/config/config.go @@ -0,0 +1,11 @@ +package config + +// Sync contains the config information necessary to spin up a sync-server process. +type Sync struct { + // The topic for events which are written by the room server output log. + RoomserverOutputTopic string `yaml:"roomserver_topic"` + // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. + KafkaConsumerURIs []string `yaml:"consumer_uris"` + // The postgres connection config for connecting to the database e.g a postgres:// URI + DataSource string `yaml:"database"` +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/syncserver/consumers/roomserver.go similarity index 84% rename from src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go rename to src/github.com/matrix-org/dendrite/syncserver/consumers/roomserver.go index 142b6f11..8e5fe0ff 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncserver/consumers/roomserver.go @@ -1,14 +1,15 @@ -package sync +package consumers import ( "encoding/json" log "github.com/Sirupsen/logrus" - "github.com/matrix-org/dendrite/clientapi/config" - "github.com/matrix-org/dendrite/clientapi/storage" - "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncserver/config" + "github.com/matrix-org/dendrite/syncserver/storage" + "github.com/matrix-org/dendrite/syncserver/sync" + "github.com/matrix-org/dendrite/syncserver/types" "github.com/matrix-org/gomatrixserverlib" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -17,11 +18,11 @@ import ( type Server struct { roomServerConsumer *common.ContinualConsumer db *storage.SyncServerDatabase - rp *RequestPool + rp *sync.RequestPool } // NewServer creates a new sync server. Call Start() to begin consuming from room servers. -func NewServer(cfg *config.Sync, rp *RequestPool, store *storage.SyncServerDatabase) (*Server, error) { +func NewServer(cfg *config.Sync, rp *sync.RequestPool, store *storage.SyncServerDatabase) (*Server, error) { kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) if err != nil { return nil, err @@ -81,7 +82,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { }).Panicf("roomserver output log: write event failure") return nil } - s.rp.OnNewEvent(&ev, syncapi.StreamPosition(syncStreamPos)) + s.rp.OnNewEvent(&ev, types.StreamPosition(syncStreamPos)) return nil } diff --git a/src/github.com/matrix-org/dendrite/syncserver/routing/routing.go b/src/github.com/matrix-org/dendrite/syncserver/routing/routing.go new file mode 100644 index 00000000..c6171fff --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncserver/routing/routing.go @@ -0,0 +1,29 @@ +package routing + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/syncserver/config" + "github.com/matrix-org/dendrite/syncserver/sync" + "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" +) + +const pathPrefixR0 = "/_matrix/client/r0" + +// SetupSyncServerListeners configures the given mux with sync-server listeners +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp *sync.RequestPool) { + apiMux := mux.NewRouter() + r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() + r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { + return srp.OnIncomingSyncRequest(req) + }))) + servMux.Handle("/metrics", prometheus.Handler()) + servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) +} + +// make a util.JSONRequestHandler into an http.Handler +func make(metricsName string, h util.JSONRequestHandler) http.Handler { + return prometheus.InstrumentHandler(metricsName, util.MakeJSONAPI(h)) +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncserver/storage/current_room_state_table.go similarity index 100% rename from src/github.com/matrix-org/dendrite/clientapi/storage/current_room_state_table.go rename to src/github.com/matrix-org/dendrite/syncserver/storage/current_room_state_table.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go similarity index 100% rename from src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go rename to src/github.com/matrix-org/dendrite/syncserver/storage/output_room_events_table.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go similarity index 89% rename from src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go rename to src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go index 4b92ce67..b7d63d83 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncserver/storage/syncserver.go @@ -4,8 +4,8 @@ import ( "database/sql" // Import the postgres database driver. _ "github.com/lib/pq" - "github.com/matrix-org/dendrite/clientapi/sync/syncapi" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -42,14 +42,14 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns the sync stream position for the inserted event. // Returns an error if there was a problem inserting this event. -func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos syncapi.StreamPosition, returnErr error) { +func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos types.StreamPosition, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { var err error pos, err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) if err != nil { return err } - streamPos = syncapi.StreamPosition(pos) + streamPos = types.StreamPosition(pos) if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. @@ -88,16 +88,16 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o } // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. -func (d *SyncServerDatabase) SyncStreamPosition() (syncapi.StreamPosition, error) { +func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) { id, err := d.events.MaxID() if err != nil { - return syncapi.StreamPosition(0), err + return types.StreamPosition(0), err } - return syncapi.StreamPosition(id), nil + return types.StreamPosition(id), nil } // EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos. -func (d *SyncServerDatabase) EventsInRange(oldPos, newPos syncapi.StreamPosition) ([]gomatrixserverlib.Event, error) { +func (d *SyncServerDatabase) EventsInRange(oldPos, newPos types.StreamPosition) ([]gomatrixserverlib.Event, error) { return d.events.InRange(int64(oldPos), int64(newPos)) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go similarity index 89% rename from src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go rename to src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go index e68606c2..e4dd65b9 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncserver/sync/requestpool.go @@ -10,8 +10,8 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/storage" - "github.com/matrix-org/dendrite/clientapi/sync/syncapi" + "github.com/matrix-org/dendrite/syncserver/storage" + "github.com/matrix-org/dendrite/syncserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -21,7 +21,7 @@ const defaultSyncTimeout = time.Duration(30) * time.Second type syncRequest struct { userID string timeout time.Duration - since syncapi.StreamPosition + since types.StreamPosition wantFullState bool } @@ -29,7 +29,7 @@ type syncRequest struct { type RequestPool struct { db *storage.SyncServerDatabase // The latest sync stream position: guarded by 'cond'. - currPos syncapi.StreamPosition + currPos types.StreamPosition // A condition variable to notify all waiting goroutines of a new sync stream position cond *sync.Cond } @@ -40,7 +40,7 @@ func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) { if err != nil { return nil, err } - return &RequestPool{db, syncapi.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil + return &RequestPool{db, types.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -115,7 +115,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons // OnNewEvent is called when a new event is received from the room server. Must only be // called from a single goroutine, to avoid races between updates which could set the // current position in the stream incorrectly. -func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncapi.StreamPosition) { +func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos types.StreamPosition) { // update the current position in a guard and then notify all /sync streams rp.cond.L.Lock() rp.currPos = pos @@ -124,7 +124,7 @@ func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncapi.Strea rp.cond.Broadcast() // notify ALL waiting goroutines } -func (rp *RequestPool) waitForEvents(req syncRequest) syncapi.StreamPosition { +func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition { // In a guard, check if the /sync request should block, and block it until we get a new position rp.cond.L.Lock() currentPos := rp.currPos @@ -138,7 +138,7 @@ func (rp *RequestPool) waitForEvents(req syncRequest) syncapi.StreamPosition { return currentPos } -func (rp *RequestPool) currentSyncForUser(req syncRequest) (*syncapi.Response, error) { +func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { currentPos := rp.waitForEvents(req) // TODO: handle ignored users @@ -158,7 +158,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*syncapi.Response, e return nil, err } - res := syncapi.NewResponse() + res := types.NewResponse() // for now, dump everything as join timeline events for _, ev := range events { roomData := res.Rooms.Join[ev.RoomID()] @@ -183,13 +183,13 @@ func getTimeout(timeoutMS string) time.Duration { return time.Duration(i) * time.Millisecond } -func getSyncStreamPosition(since string) (syncapi.StreamPosition, error) { +func getSyncStreamPosition(since string) (types.StreamPosition, error) { if since == "" { - return syncapi.StreamPosition(0), nil + return types.StreamPosition(0), nil } i, err := strconv.Atoi(since) if err != nil { - return syncapi.StreamPosition(0), err + return types.StreamPosition(0), err } - return syncapi.StreamPosition(i), nil + return types.StreamPosition(i), nil } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go b/src/github.com/matrix-org/dendrite/syncserver/types/types.go similarity index 99% rename from src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go rename to src/github.com/matrix-org/dendrite/syncserver/types/types.go index 24603c03..4a30326c 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncapi/syncapi.go +++ b/src/github.com/matrix-org/dendrite/syncserver/types/types.go @@ -1,4 +1,4 @@ -package syncapi +package types import ( "strconv"