Replace the cmd specific config with common config. (#144)
* Move all the dendrite config in to a single place * Add tests for config parsing * replace syncserver config with common config * Replace client API config with common config * Replace federation API config with common config * Replace media api config with common config * Replace room server config with common config * Remove unused readKey function * Fix the integration tests * Comment on hardcoding roomserver to HTTP * Add a method for getting RoomServerURL This moves the hardcoding of HTTPs into one place.main
parent
3fbe728d94
commit
e67f9401be
|
@ -1,37 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package config
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"golang.org/x/crypto/ed25519"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ClientAPI contains the config information necessary to spin up a clientapi process.
|
|
||||||
type ClientAPI struct {
|
|
||||||
// The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
|
|
||||||
ServerName gomatrixserverlib.ServerName
|
|
||||||
// The private key which will be used to sign events.
|
|
||||||
PrivateKey ed25519.PrivateKey
|
|
||||||
// An arbitrary string used to uniquely identify the PrivateKey. Must start with the
|
|
||||||
// prefix "ed25519:".
|
|
||||||
KeyID gomatrixserverlib.KeyID
|
|
||||||
// A list of URIs to send events to. These kafka logs should be consumed by a Room Server.
|
|
||||||
KafkaProducerURIs []string
|
|
||||||
// The topic for events which are written to the logs.
|
|
||||||
ClientAPIOutputTopic string
|
|
||||||
// The URL of the roomserver which can service Query API requests
|
|
||||||
RoomserverURL string
|
|
||||||
}
|
|
|
@ -21,9 +21,9 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth"
|
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/clientapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
@ -56,7 +56,10 @@ func passwordLogin() loginFlows {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Login implements GET and POST /login
|
// Login implements GET and POST /login
|
||||||
func Login(req *http.Request, accountDB *accounts.Database, deviceDB *devices.Database, cfg config.ClientAPI) util.JSONResponse {
|
func Login(
|
||||||
|
req *http.Request, accountDB *accounts.Database, deviceDB *devices.Database,
|
||||||
|
cfg config.Dendrite,
|
||||||
|
) util.JSONResponse {
|
||||||
if req.Method == "GET" { // TODO: support other forms of login other than password, depending on config options
|
if req.Method == "GET" { // TODO: support other forms of login other than password, depending on config options
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
|
@ -109,7 +112,7 @@ func Login(req *http.Request, accountDB *accounts.Database, deviceDB *devices.Da
|
||||||
JSON: loginResponse{
|
JSON: loginResponse{
|
||||||
UserID: dev.UserID,
|
UserID: dev.UserID,
|
||||||
AccessToken: dev.AccessToken,
|
AccessToken: dev.AccessToken,
|
||||||
HomeServer: cfg.ServerName,
|
HomeServer: cfg.Matrix.ServerName,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,11 +22,11 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/clientapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/clientapi/readers"
|
"github.com/matrix-org/dendrite/clientapi/readers"
|
||||||
"github.com/matrix-org/dendrite/clientapi/writers"
|
"github.com/matrix-org/dendrite/clientapi/writers"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
@ -38,7 +38,7 @@ const pathPrefixR0 = "/_matrix/client/r0"
|
||||||
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
||||||
// to clients which need to make outbound HTTP requests.
|
// to clients which need to make outbound HTTP requests.
|
||||||
func Setup(
|
func Setup(
|
||||||
servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI,
|
servMux *http.ServeMux, httpClient *http.Client, cfg config.Dendrite,
|
||||||
producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
|
producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
|
||||||
accountDB *accounts.Database,
|
accountDB *accounts.Database,
|
||||||
deviceDB *devices.Database,
|
deviceDB *devices.Database,
|
||||||
|
|
|
@ -23,11 +23,11 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/events"
|
"github.com/matrix-org/dendrite/clientapi/events"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
@ -91,15 +91,15 @@ type fledglingEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateRoom implements /createRoom
|
// CreateRoom implements /createRoom
|
||||||
func CreateRoom(req *http.Request, device *authtypes.Device, cfg config.ClientAPI, producer *producers.RoomserverProducer) util.JSONResponse {
|
func CreateRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, producer *producers.RoomserverProducer) util.JSONResponse {
|
||||||
// TODO: Check room ID doesn't clash with an existing one, and we
|
// TODO: Check room ID doesn't clash with an existing one, and we
|
||||||
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
|
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
|
||||||
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.ServerName)
|
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||||
return createRoom(req, device, cfg, roomID, producer)
|
return createRoom(req, device, cfg, roomID, producer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// createRoom implements /createRoom
|
// createRoom implements /createRoom
|
||||||
func createRoom(req *http.Request, device *authtypes.Device, cfg config.ClientAPI, roomID string, producer *producers.RoomserverProducer) util.JSONResponse {
|
func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer) util.JSONResponse {
|
||||||
logger := util.GetLogger(req.Context())
|
logger := util.GetLogger(req.Context())
|
||||||
userID := device.UserID
|
userID := device.UserID
|
||||||
var r createRoomRequest
|
var r createRoomRequest
|
||||||
|
@ -201,7 +201,7 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.ClientAP
|
||||||
// buildEvent fills out auth_events for the builder then builds the event
|
// buildEvent fills out auth_events for the builder then builds the event
|
||||||
func buildEvent(builder *gomatrixserverlib.EventBuilder,
|
func buildEvent(builder *gomatrixserverlib.EventBuilder,
|
||||||
provider gomatrixserverlib.AuthEventProvider,
|
provider gomatrixserverlib.AuthEventProvider,
|
||||||
cfg config.ClientAPI) (*gomatrixserverlib.Event, error) {
|
cfg config.Dendrite) (*gomatrixserverlib.Event, error) {
|
||||||
|
|
||||||
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -212,9 +212,9 @@ func buildEvent(builder *gomatrixserverlib.EventBuilder,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
builder.AuthEvents = refs
|
builder.AuthEvents = refs
|
||||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.ServerName)
|
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
event, err := builder.Build(eventID, now, cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
|
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot build event %s : Builder failed to build. %s", builder.Type, err)
|
return nil, fmt.Errorf("cannot build event %s : Builder failed to build. %s", builder.Type, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,10 +17,10 @@ package writers
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -36,7 +36,7 @@ func JoinRoomByIDOrAlias(
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
device *authtypes.Device,
|
device *authtypes.Device,
|
||||||
roomIDOrAlias string,
|
roomIDOrAlias string,
|
||||||
cfg config.ClientAPI,
|
cfg config.Dendrite,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
producer *producers.RoomserverProducer,
|
producer *producers.RoomserverProducer,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
@ -67,7 +67,7 @@ type joinRoomReq struct {
|
||||||
req *http.Request
|
req *http.Request
|
||||||
content map[string]interface{}
|
content map[string]interface{}
|
||||||
userID string
|
userID string
|
||||||
cfg config.ClientAPI
|
cfg config.Dendrite
|
||||||
federation *gomatrixserverlib.FederationClient
|
federation *gomatrixserverlib.FederationClient
|
||||||
producer *producers.RoomserverProducer
|
producer *producers.RoomserverProducer
|
||||||
queryAPI api.RoomserverQueryAPI
|
queryAPI api.RoomserverQueryAPI
|
||||||
|
@ -95,7 +95,7 @@ func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
|
||||||
JSON: jsonerror.BadJSON("Room alias must be in the form '#localpart:domain'"),
|
JSON: jsonerror.BadJSON("Room alias must be in the form '#localpart:domain'"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if domain == r.cfg.ServerName {
|
if domain == r.cfg.Matrix.ServerName {
|
||||||
// TODO: Implement joining local room aliases.
|
// TODO: Implement joining local room aliases.
|
||||||
panic(fmt.Errorf("Joining local room aliases is not implemented"))
|
panic(fmt.Errorf("Joining local room aliases is not implemented"))
|
||||||
} else {
|
} else {
|
||||||
|
@ -212,9 +212,9 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
|
||||||
r.writeToBuilder(&respMakeJoin.JoinEvent, roomID)
|
r.writeToBuilder(&respMakeJoin.JoinEvent, roomID)
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.ServerName)
|
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.Matrix.ServerName)
|
||||||
event, err := respMakeJoin.JoinEvent.Build(
|
event, err := respMakeJoin.JoinEvent.Build(
|
||||||
eventID, now, r.cfg.ServerName, r.cfg.KeyID, r.cfg.PrivateKey,
|
eventID, now, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID, r.cfg.Matrix.PrivateKey,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res := httputil.LogThenError(r.req, err)
|
res := httputil.LogThenError(r.req, err)
|
||||||
|
|
|
@ -21,10 +21,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
@ -39,7 +39,14 @@ type sendEventResponse struct {
|
||||||
// SendEvent implements:
|
// SendEvent implements:
|
||||||
// /rooms/{roomID}/send/{eventType}/{txnID}
|
// /rooms/{roomID}/send/{eventType}/{txnID}
|
||||||
// /rooms/{roomID}/state/{eventType}/{stateKey}
|
// /rooms/{roomID}/state/{eventType}/{stateKey}
|
||||||
func SendEvent(req *http.Request, device *authtypes.Device, roomID, eventType, txnID string, stateKey *string, cfg config.ClientAPI, queryAPI api.RoomserverQueryAPI, producer *producers.RoomserverProducer) util.JSONResponse {
|
func SendEvent(
|
||||||
|
req *http.Request,
|
||||||
|
device *authtypes.Device,
|
||||||
|
roomID, eventType, txnID string, stateKey *string,
|
||||||
|
cfg config.Dendrite,
|
||||||
|
queryAPI api.RoomserverQueryAPI,
|
||||||
|
producer *producers.RoomserverProducer,
|
||||||
|
) util.JSONResponse {
|
||||||
// parse the incoming http request
|
// parse the incoming http request
|
||||||
userID := device.UserID
|
userID := device.UserID
|
||||||
var r map[string]interface{} // must be a JSON object
|
var r map[string]interface{} // must be a JSON object
|
||||||
|
@ -86,8 +93,10 @@ func SendEvent(req *http.Request, device *authtypes.Device, roomID, eventType, t
|
||||||
refs = append(refs, e.EventReference())
|
refs = append(refs, e.EventReference())
|
||||||
}
|
}
|
||||||
builder.AuthEvents = refs
|
builder.AuthEvents = refs
|
||||||
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.ServerName)
|
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
|
||||||
e, err := builder.Build(eventID, time.Now(), cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
|
e, err := builder.Build(
|
||||||
|
eventID, time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,16 +15,16 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/clientapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/clientapi/routing"
|
"github.com/matrix-org/dendrite/clientapi/routing"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/common/keydb"
|
"github.com/matrix-org/dendrite/common/keydb"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
||||||
|
@ -34,70 +34,45 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
|
||||||
bindAddr = os.Getenv("BIND_ADDRESS")
|
|
||||||
logDir = os.Getenv("LOG_DIR")
|
logDir = os.Getenv("LOG_DIR")
|
||||||
roomserverURL = os.Getenv("ROOMSERVER_URL")
|
configPath = flag.String("config", "dendrite.yaml", "The path to the config file, For more information see the config file in this repository")
|
||||||
clientAPIOutputTopic = os.Getenv("CLIENTAPI_OUTPUT_TOPIC")
|
|
||||||
serverName = gomatrixserverlib.ServerName(os.Getenv("SERVER_NAME"))
|
|
||||||
serverKey = os.Getenv("SERVER_KEY")
|
|
||||||
accountDataSource = os.Getenv("ACCOUNT_DATABASE")
|
|
||||||
keyDataSource = os.Getenv("KEY_DATABASE")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
common.SetupLogging(logDir)
|
common.SetupLogging(logDir)
|
||||||
if bindAddr == "" {
|
|
||||||
log.Panic("No BIND_ADDRESS environment variable found.")
|
|
||||||
}
|
|
||||||
if len(kafkaURIs) == 0 {
|
|
||||||
// the kafka default is :9092
|
|
||||||
kafkaURIs = []string{"localhost:9092"}
|
|
||||||
}
|
|
||||||
if roomserverURL == "" {
|
|
||||||
log.Panic("No ROOMSERVER_URL environment variable found.")
|
|
||||||
}
|
|
||||||
if clientAPIOutputTopic == "" {
|
|
||||||
log.Panic("No CLIENTAPI_OUTPUT_TOPIC environment variable found. This should match the roomserver input topic.")
|
|
||||||
}
|
|
||||||
if serverName == "" {
|
|
||||||
serverName = "localhost"
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := config.ClientAPI{
|
flag.Parse()
|
||||||
ServerName: serverName,
|
|
||||||
KafkaProducerURIs: kafkaURIs,
|
|
||||||
ClientAPIOutputTopic: clientAPIOutputTopic,
|
|
||||||
RoomserverURL: roomserverURL,
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
cfg, err := config.Load(*configPath)
|
||||||
cfg.KeyID, cfg.PrivateKey, err = common.ReadKey(serverKey)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to load private key: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Starting clientapi")
|
log.Info("config: ", cfg)
|
||||||
|
|
||||||
roomserverProducer, err := producers.NewRoomserverProducer(cfg.KafkaProducerURIs, cfg.ClientAPIOutputTopic)
|
roomserverProducer, err := producers.NewRoomserverProducer(
|
||||||
|
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.KafkaProducerURIs, err)
|
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
|
federation := gomatrixserverlib.NewFederationClient(
|
||||||
|
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||||
|
)
|
||||||
|
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil)
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
accountDB, err := accounts.NewDatabase(accountDataSource, serverName)
|
accountDB, err := accounts.NewDatabase(string(cfg.Database.Account), cfg.Matrix.ServerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup account database(%q): %s", accountDataSource, err.Error())
|
log.Panicf("Failed to setup account database(%q): %s", cfg.Database.Account, err.Error())
|
||||||
}
|
}
|
||||||
deviceDB, err := devices.NewDatabase(accountDataSource, serverName)
|
deviceDB, err := devices.NewDatabase(string(cfg.Database.Device), cfg.Matrix.ServerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup device database(%q): %s", accountDataSource, err.Error())
|
log.Panicf("Failed to setup device database(%q): %s", cfg.Database.Device, err.Error())
|
||||||
}
|
}
|
||||||
keyDB, err := keydb.NewDatabase(keyDataSource)
|
keyDB, err := keydb.NewDatabase(string(cfg.Database.ServerKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup key database(%q): %s", keyDataSource, err.Error())
|
log.Panicf("Failed to setup key database(%q): %s", cfg.Database.ServerKey, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
keyRing := gomatrixserverlib.KeyRing{
|
keyRing := gomatrixserverlib.KeyRing{
|
||||||
|
@ -108,9 +83,10 @@ func main() {
|
||||||
KeyDatabase: keyDB,
|
KeyDatabase: keyDB,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
|
||||||
routing.Setup(
|
routing.Setup(
|
||||||
http.DefaultServeMux, http.DefaultClient, cfg, roomserverProducer,
|
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
||||||
queryAPI, accountDB, deviceDB, federation, keyRing,
|
queryAPI, accountDB, deviceDB, federation, keyRing,
|
||||||
)
|
)
|
||||||
log.Fatal(http.ListenAndServe(bindAddr, nil))
|
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,16 +15,14 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/base64"
|
"flag"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/common/keydb"
|
"github.com/matrix-org/dendrite/common/keydb"
|
||||||
"github.com/matrix-org/dendrite/federationapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/routing"
|
"github.com/matrix-org/dendrite/federationapi/routing"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -33,73 +31,28 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bindAddr = os.Getenv("BIND_ADDRESS")
|
|
||||||
logDir = os.Getenv("LOG_DIR")
|
logDir = os.Getenv("LOG_DIR")
|
||||||
serverName = gomatrixserverlib.ServerName(os.Getenv("SERVER_NAME"))
|
configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
serverKey = os.Getenv("SERVER_KEY")
|
|
||||||
// Base64 encoded SHA256 TLS fingerprint of the X509 certificate used by
|
|
||||||
// the public federation listener for this server.
|
|
||||||
// Can be generated from a PEM certificate called "server.crt" using:
|
|
||||||
//
|
|
||||||
// openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\
|
|
||||||
// python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")'
|
|
||||||
//
|
|
||||||
tlsFingerprint = os.Getenv("TLS_FINGERPRINT")
|
|
||||||
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
|
||||||
roomserverURL = os.Getenv("ROOMSERVER_URL")
|
|
||||||
roomserverInputTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
|
|
||||||
keyDataSource = os.Getenv("KEY_DATABASE")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
common.SetupLogging(logDir)
|
common.SetupLogging(logDir)
|
||||||
if bindAddr == "" {
|
|
||||||
log.Panic("No BIND_ADDRESS environment variable found.")
|
|
||||||
}
|
|
||||||
|
|
||||||
if serverName == "" {
|
if *configPath == "" {
|
||||||
serverName = "localhost"
|
log.Fatal("--config must be supplied")
|
||||||
}
|
}
|
||||||
|
cfg, err := config.Load(*configPath)
|
||||||
if tlsFingerprint == "" {
|
|
||||||
log.Panic("No TLS_FINGERPRINT environment variable found.")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(kafkaURIs) == 0 {
|
|
||||||
// the kafka default is :9092
|
|
||||||
kafkaURIs = []string{"localhost:9092"}
|
|
||||||
}
|
|
||||||
|
|
||||||
if roomserverURL == "" {
|
|
||||||
log.Panic("No ROOMSERVER_URL environment variable found.")
|
|
||||||
}
|
|
||||||
|
|
||||||
if roomserverInputTopic == "" {
|
|
||||||
log.Panic("No TOPIC_INPUT_ROOM_EVENT environment variable found. This should match the roomserver input topic.")
|
|
||||||
}
|
|
||||||
cfg := config.FederationAPI{
|
|
||||||
ServerName: serverName,
|
|
||||||
// TODO: make the validity period configurable.
|
|
||||||
ValidityPeriod: 24 * time.Hour,
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
cfg.KeyID, cfg.PrivateKey, err = common.ReadKey(serverKey)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to load private key: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var fingerprintSHA256 []byte
|
federation := gomatrixserverlib.NewFederationClient(
|
||||||
if fingerprintSHA256, err = base64.RawStdEncoding.DecodeString(tlsFingerprint); err != nil {
|
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
|
||||||
log.Panicf("Failed to load TLS fingerprint: %s", err)
|
)
|
||||||
}
|
|
||||||
cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}}
|
|
||||||
|
|
||||||
federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
|
keyDB, err := keydb.NewDatabase(string(cfg.Database.ServerKey))
|
||||||
|
|
||||||
keyDB, err := keydb.NewDatabase(keyDataSource)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup key database(%q): %s", keyDataSource, err.Error())
|
log.Panicf("Failed to setup key database(%q): %s", cfg.Database.ServerKey, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
keyRing := gomatrixserverlib.KeyRing{
|
keyRing := gomatrixserverlib.KeyRing{
|
||||||
|
@ -109,13 +62,19 @@ func main() {
|
||||||
},
|
},
|
||||||
KeyDatabase: keyDB,
|
KeyDatabase: keyDB,
|
||||||
}
|
}
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP(roomserverURL, nil)
|
|
||||||
|
|
||||||
roomserverProducer, err := producers.NewRoomserverProducer(kafkaURIs, roomserverInputTopic)
|
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
|
||||||
|
|
||||||
|
roomserverProducer, err := producers.NewRoomserverProducer(
|
||||||
|
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
|
||||||
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err)
|
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.Kafka.Addresses, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing, federation)
|
log.Info("Starting federation API server on ", cfg.Listen.FederationAPI)
|
||||||
log.Fatal(http.ListenAndServe(bindAddr, nil))
|
|
||||||
|
routing.Setup(http.DefaultServeMux, *cfg, queryAPI, roomserverProducer, keyRing, federation)
|
||||||
|
log.Fatal(http.ListenAndServe(string(cfg.Listen.FederationAPI), nil))
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,34 +16,19 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/user"
|
|
||||||
"path/filepath"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/routing"
|
"github.com/matrix-org/dendrite/mediaapi/routing"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
yaml "gopkg.in/yaml.v2"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bindAddr = flag.String("listen", "", "The port to listen on.")
|
|
||||||
dataSource = os.Getenv("DATABASE")
|
|
||||||
logDir = os.Getenv("LOG_DIR")
|
logDir = os.Getenv("LOG_DIR")
|
||||||
serverName = os.Getenv("SERVER_NAME")
|
|
||||||
basePath = os.Getenv("BASE_PATH")
|
|
||||||
// Note: if the MAX_FILE_SIZE_BYTES is set to 0, it will be unlimited
|
|
||||||
maxFileSizeBytesString = os.Getenv("MAX_FILE_SIZE_BYTES")
|
|
||||||
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -52,206 +37,21 @@ func main() {
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
if *configPath == "" {
|
||||||
"listen": *bindAddr,
|
log.Fatal("--config must be supplied")
|
||||||
"DATABASE": dataSource,
|
}
|
||||||
"LOG_DIR": logDir,
|
cfg, err := config.Load(*configPath)
|
||||||
"SERVER_NAME": serverName,
|
|
||||||
"BASE_PATH": basePath,
|
|
||||||
"MAX_FILE_SIZE_BYTES": maxFileSizeBytesString,
|
|
||||||
"config": *configPath,
|
|
||||||
}).Info("Loading configuration based on config file and environment variables")
|
|
||||||
|
|
||||||
cfg, err := configureServer()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Fatal("Invalid configuration")
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithFields(log.Fields{
|
db, err := storage.Open(string(cfg.Database.MediaAPI))
|
||||||
"listen": *bindAddr,
|
|
||||||
"LOG_DIR": logDir,
|
|
||||||
"CONFIG_PATH": *configPath,
|
|
||||||
"ServerName": cfg.ServerName,
|
|
||||||
"AbsBasePath": cfg.AbsBasePath,
|
|
||||||
"MaxFileSizeBytes": *cfg.MaxFileSizeBytes,
|
|
||||||
"DataSource": cfg.DataSource,
|
|
||||||
"DynamicThumbnails": cfg.DynamicThumbnails,
|
|
||||||
"MaxThumbnailGenerators": cfg.MaxThumbnailGenerators,
|
|
||||||
"ThumbnailSizes": cfg.ThumbnailSizes,
|
|
||||||
}).Info("Starting mediaapi server with configuration")
|
|
||||||
|
|
||||||
db, err := storage.Open(cfg.DataSource)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Panic("Failed to open database")
|
log.WithError(err).Panic("Failed to open database")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info("Starting media API server on ", cfg.Listen.MediaAPI)
|
||||||
|
|
||||||
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, db)
|
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, db)
|
||||||
log.Fatal(http.ListenAndServe(*bindAddr, nil))
|
log.Fatal(http.ListenAndServe(string(cfg.Listen.MediaAPI), nil))
|
||||||
}
|
|
||||||
|
|
||||||
// configureServer loads configuration from a yaml file and overrides with environment variables
|
|
||||||
func configureServer() (*config.MediaAPI, error) {
|
|
||||||
if *configPath == "" {
|
|
||||||
log.Fatal("--config must be supplied")
|
|
||||||
}
|
|
||||||
cfg, err := loadConfig(*configPath)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Fatal("Invalid config file")
|
|
||||||
}
|
|
||||||
|
|
||||||
// override values from environment variables
|
|
||||||
applyOverrides(cfg)
|
|
||||||
|
|
||||||
if err := validateConfig(cfg); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return cfg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: make common somehow? copied from sync api
|
|
||||||
func loadConfig(configPath string) (*config.MediaAPI, error) {
|
|
||||||
contents, err := ioutil.ReadFile(configPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var cfg config.MediaAPI
|
|
||||||
if err = yaml.Unmarshal(contents, &cfg); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &cfg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyOverrides(cfg *config.MediaAPI) {
|
|
||||||
if serverName != "" {
|
|
||||||
if cfg.ServerName != "" {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"server_name": cfg.ServerName,
|
|
||||||
"SERVER_NAME": serverName,
|
|
||||||
}).Info("Overriding server_name from config file with environment variable")
|
|
||||||
}
|
|
||||||
cfg.ServerName = gomatrixserverlib.ServerName(serverName)
|
|
||||||
}
|
|
||||||
if cfg.ServerName == "" {
|
|
||||||
log.Info("ServerName not set. Defaulting to 'localhost'.")
|
|
||||||
cfg.ServerName = "localhost"
|
|
||||||
}
|
|
||||||
|
|
||||||
if basePath != "" {
|
|
||||||
if cfg.BasePath != "" {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"base_path": cfg.BasePath,
|
|
||||||
"BASE_PATH": basePath,
|
|
||||||
}).Info("Overriding base_path from config file with environment variable")
|
|
||||||
}
|
|
||||||
cfg.BasePath = types.Path(basePath)
|
|
||||||
}
|
|
||||||
|
|
||||||
if maxFileSizeBytesString != "" {
|
|
||||||
if cfg.MaxFileSizeBytes != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"max_file_size_bytes": *cfg.MaxFileSizeBytes,
|
|
||||||
"MAX_FILE_SIZE_BYTES": maxFileSizeBytesString,
|
|
||||||
}).Info("Overriding max_file_size_bytes from config file with environment variable")
|
|
||||||
}
|
|
||||||
maxFileSizeBytesInt, err := strconv.ParseInt(maxFileSizeBytesString, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
maxFileSizeBytesInt = 10 * 1024 * 1024
|
|
||||||
log.WithError(err).WithField(
|
|
||||||
"MAX_FILE_SIZE_BYTES", maxFileSizeBytesString,
|
|
||||||
).Infof("MAX_FILE_SIZE_BYTES not set? Defaulting to %v bytes.", maxFileSizeBytesInt)
|
|
||||||
}
|
|
||||||
maxFileSizeBytes := types.FileSizeBytes(maxFileSizeBytesInt)
|
|
||||||
cfg.MaxFileSizeBytes = &maxFileSizeBytes
|
|
||||||
}
|
|
||||||
|
|
||||||
if dataSource != "" {
|
|
||||||
if cfg.DataSource != "" {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"database": cfg.DataSource,
|
|
||||||
"DATABASE": dataSource,
|
|
||||||
}).Info("Overriding database from config file with environment variable")
|
|
||||||
}
|
|
||||||
cfg.DataSource = dataSource
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.MaxThumbnailGenerators == 0 {
|
|
||||||
log.WithField(
|
|
||||||
"max_thumbnail_generators", cfg.MaxThumbnailGenerators,
|
|
||||||
).Info("Using default max_thumbnail_generators value of 10")
|
|
||||||
cfg.MaxThumbnailGenerators = 10
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func validateConfig(cfg *config.MediaAPI) error {
|
|
||||||
if *bindAddr == "" {
|
|
||||||
log.Fatal("--listen must be supplied")
|
|
||||||
}
|
|
||||||
|
|
||||||
absBasePath, err := getAbsolutePath(cfg.BasePath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("invalid base path (%v): %q", cfg.BasePath, err)
|
|
||||||
}
|
|
||||||
cfg.AbsBasePath = types.Path(absBasePath)
|
|
||||||
|
|
||||||
if *cfg.MaxFileSizeBytes < 0 {
|
|
||||||
return fmt.Errorf("invalid max file size bytes (%v)", *cfg.MaxFileSizeBytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.DataSource == "" {
|
|
||||||
return fmt.Errorf("invalid database (%v)", cfg.DataSource)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, config := range cfg.ThumbnailSizes {
|
|
||||||
if config.Width <= 0 || config.Height <= 0 {
|
|
||||||
return fmt.Errorf("invalid thumbnail size %vx%v", config.Width, config.Height)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getAbsolutePath(basePath types.Path) (types.Path, error) {
|
|
||||||
var err error
|
|
||||||
if basePath == "" {
|
|
||||||
var wd string
|
|
||||||
wd, err = os.Getwd()
|
|
||||||
return types.Path(wd), err
|
|
||||||
}
|
|
||||||
// Note: If we got here len(basePath) >= 1
|
|
||||||
if basePath[0] == '~' {
|
|
||||||
basePath, err = expandHomeDir(basePath)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
absBasePath, err := filepath.Abs(string(basePath))
|
|
||||||
return types.Path(absBasePath), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// expandHomeDir parses paths beginning with ~/path or ~user/path and replaces the home directory part
|
|
||||||
func expandHomeDir(basePath types.Path) (types.Path, error) {
|
|
||||||
slash := strings.Index(string(basePath), "/")
|
|
||||||
if slash == -1 {
|
|
||||||
// pretend the slash is after the path as none was found within the string
|
|
||||||
// simplifies code using slash below
|
|
||||||
slash = len(basePath)
|
|
||||||
}
|
|
||||||
var usr *user.User
|
|
||||||
var err error
|
|
||||||
if slash == 1 {
|
|
||||||
// basePath is ~ or ~/path
|
|
||||||
usr, err = user.Current()
|
|
||||||
if err != nil {
|
|
||||||
return "", fmt.Errorf("failed to get user's home directory: %q", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// slash > 1
|
|
||||||
// basePath is ~user or ~user/path
|
|
||||||
usr, err = user.Lookup(string(basePath[1:slash]))
|
|
||||||
if err != nil {
|
|
||||||
return "", fmt.Errorf("failed to get user's home directory: %q", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return types.Path(filepath.Join(usr.HomeDir, string(basePath[slash:]))), nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,14 +15,16 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/input"
|
"github.com/matrix-org/dendrite/roomserver/input"
|
||||||
"github.com/matrix-org/dendrite/roomserver/query"
|
"github.com/matrix-org/dendrite/roomserver/query"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
|
@ -31,42 +33,48 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
database = os.Getenv("DATABASE")
|
logDir = os.Getenv("LOG_DIR")
|
||||||
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
|
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
|
|
||||||
outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT")
|
|
||||||
bindAddr = os.Getenv("BIND_ADDRESS")
|
|
||||||
// Shuts the roomserver down after processing a given number of messages.
|
|
||||||
// This is useful for running benchmarks for seeing how quickly the server
|
|
||||||
// can process a given number of messages.
|
|
||||||
stopProcessingAfter = os.Getenv("STOP_AFTER")
|
stopProcessingAfter = os.Getenv("STOP_AFTER")
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
db, err := storage.Open(database)
|
common.SetupLogging(logDir)
|
||||||
|
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
if *configPath == "" {
|
||||||
|
log.Fatal("--config must be supplied")
|
||||||
|
}
|
||||||
|
cfg, err := config.Load(*configPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := storage.Open(string(cfg.Database.RoomServer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kafkaConsumer, err := sarama.NewConsumer(kafkaURIs, nil)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kafkaProducer, err := sarama.NewSyncProducer(kafkaURIs, nil)
|
kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer := input.Consumer{
|
consumer := input.Consumer{
|
||||||
ContinualConsumer: common.ContinualConsumer{
|
ContinualConsumer: common.ContinualConsumer{
|
||||||
Topic: inputRoomEventTopic,
|
Topic: string(cfg.Kafka.Topics.InputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: db,
|
PartitionStore: db,
|
||||||
},
|
},
|
||||||
DB: db,
|
DB: db,
|
||||||
Producer: kafkaProducer,
|
Producer: kafkaProducer,
|
||||||
OutputRoomEventTopic: outputRoomEventTopic,
|
OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
}
|
}
|
||||||
|
|
||||||
if stopProcessingAfter != "" {
|
if stopProcessingAfter != "" {
|
||||||
|
@ -93,10 +101,10 @@ func main() {
|
||||||
|
|
||||||
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
||||||
|
|
||||||
fmt.Println("Started roomserver")
|
log.Info("Started room server on ", cfg.Listen.RoomServer)
|
||||||
|
|
||||||
// TODO: Implement clean shutdown.
|
// TODO: Implement clean shutdown.
|
||||||
if err := http.ListenAndServe(bindAddr, nil); err != nil {
|
if err := http.ListenAndServe(string(cfg.Listen.RoomServer), nil); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,13 +16,12 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/consumers"
|
"github.com/matrix-org/dendrite/syncapi/consumers"
|
||||||
"github.com/matrix-org/dendrite/syncapi/routing"
|
"github.com/matrix-org/dendrite/syncapi/routing"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
@ -30,27 +29,9 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
yaml "gopkg.in/yaml.v2"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var configPath = flag.String("config", "sync-server-config.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
var bindAddr = flag.String("listen", ":4200", "The port to listen on.")
|
|
||||||
|
|
||||||
func loadConfig(configPath string) (*config.Sync, error) {
|
|
||||||
contents, err := ioutil.ReadFile(configPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var cfg config.Sync
|
|
||||||
if err = yaml.Unmarshal(contents, &cfg); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// check required fields
|
|
||||||
if cfg.ServerName == "" {
|
|
||||||
log.Fatalf("'server_name' must be supplied in %s", configPath)
|
|
||||||
}
|
|
||||||
return &cfg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
common.SetupLogging(os.Getenv("LOG_DIR"))
|
common.SetupLogging(os.Getenv("LOG_DIR"))
|
||||||
|
@ -60,26 +41,21 @@ func main() {
|
||||||
if *configPath == "" {
|
if *configPath == "" {
|
||||||
log.Fatal("--config must be supplied")
|
log.Fatal("--config must be supplied")
|
||||||
}
|
}
|
||||||
cfg, err := loadConfig(*configPath)
|
cfg, err := config.Load(*configPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Invalid config file: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if *bindAddr == "" {
|
log.Info("config: ", cfg)
|
||||||
log.Fatal("--listen must be supplied")
|
|
||||||
|
db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI))
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.Database.SyncAPI, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("sync server config: ", cfg)
|
deviceDB, err := devices.NewDatabase(string(cfg.Database.Device), cfg.Matrix.ServerName)
|
||||||
|
|
||||||
db, err := storage.NewSyncServerDatabase(cfg.DataSource)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err)
|
log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err)
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: DO NOT USE THIS DATA SOURCE (it's the sync one, not devices!)
|
|
||||||
deviceDB, err := devices.NewDatabase(cfg.DataSource, cfg.ServerName)
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("startup: failed to create device database with data source %s : %s", cfg.DataSource, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pos, err := db.SyncStreamPosition()
|
pos, err := db.SyncStreamPosition()
|
||||||
|
@ -88,7 +64,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
n := sync.NewNotifier(types.StreamPosition(pos))
|
n := sync.NewNotifier(types.StreamPosition(pos))
|
||||||
if err := n.Load(db); err != nil {
|
if err = n.Load(db); err != nil {
|
||||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||||
}
|
}
|
||||||
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
||||||
|
@ -99,7 +75,7 @@ func main() {
|
||||||
log.Panicf("startup: failed to start room server consumer")
|
log.Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Starting sync server on ", *bindAddr)
|
log.Info("Starting sync server on ", cfg.Listen.SyncAPI)
|
||||||
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n), deviceDB)
|
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n), deviceDB)
|
||||||
log.Fatal(http.ListenAndServe(*bindAddr, nil))
|
log.Fatal(http.ListenAndServe(string(cfg.Listen.SyncAPI), nil))
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,11 @@ import (
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/common/test"
|
"github.com/matrix-org/dendrite/common/test"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"gopkg.in/yaml.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -41,10 +42,10 @@ var (
|
||||||
postgresContainerName = os.Getenv("POSTGRES_CONTAINER")
|
postgresContainerName = os.Getenv("POSTGRES_CONTAINER")
|
||||||
// Test image to be uploaded/downloaded
|
// Test image to be uploaded/downloaded
|
||||||
testJPEG = test.Defaulting(os.Getenv("TEST_JPEG_PATH"), "src/github.com/matrix-org/dendrite/cmd/mediaapi-integration-tests/totem.jpg")
|
testJPEG = test.Defaulting(os.Getenv("TEST_JPEG_PATH"), "src/github.com/matrix-org/dendrite/cmd/mediaapi-integration-tests/totem.jpg")
|
||||||
|
kafkaURI = test.Defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092")
|
||||||
)
|
)
|
||||||
|
|
||||||
var thumbnailPregenerationConfig = (`
|
var thumbnailSizes = (`
|
||||||
thumbnail_sizes:
|
|
||||||
- width: 32
|
- width: 32
|
||||||
height: 32
|
height: 32
|
||||||
method: crop
|
method: crop
|
||||||
|
@ -68,65 +69,51 @@ var testDatabaseTemplate = "dbname=%s sslmode=disable binary_parameters=yes"
|
||||||
|
|
||||||
var timeout time.Duration
|
var timeout time.Duration
|
||||||
|
|
||||||
|
var port = 10000
|
||||||
|
|
||||||
func startMediaAPI(suffix string, dynamicThumbnails bool) (*exec.Cmd, chan error, string, *exec.Cmd, chan error, string, string) {
|
func startMediaAPI(suffix string, dynamicThumbnails bool) (*exec.Cmd, chan error, string, *exec.Cmd, chan error, string, string) {
|
||||||
dir, err := ioutil.TempDir("", serverType+"-server-test"+suffix)
|
dir, err := ioutil.TempDir("", serverType+"-server-test"+suffix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
serverAddr := "localhost:177" + suffix + "9"
|
|
||||||
proxyAddr := "localhost:1800" + suffix
|
proxyAddr := "localhost:1800" + suffix
|
||||||
|
|
||||||
configFilename := serverType + "-server-test-config" + suffix + ".yaml"
|
database := fmt.Sprintf(testDatabaseTemplate, testDatabaseName+suffix)
|
||||||
configFileContents := makeConfig(proxyAddr, suffix, dir, dynamicThumbnails)
|
cfg, nextPort, err := test.MakeConfig(dir, kafkaURI, database, "localhost", port)
|
||||||
|
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(proxyAddr)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if err = yaml.Unmarshal([]byte(thumbnailSizes), &cfg.Media.ThumbnailSizes); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
port = nextPort
|
||||||
|
if err = test.WriteConfig(cfg, dir); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
serverArgs := []string{
|
serverArgs := []string{
|
||||||
"--config", configFilename,
|
"--config", filepath.Join(dir, test.ConfigFile),
|
||||||
"--listen", serverAddr,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
databases := []string{
|
databases := []string{
|
||||||
testDatabaseName + suffix,
|
testDatabaseName + suffix,
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyCmd, proxyCmdChan := test.StartProxy(
|
proxyCmd, proxyCmdChan := test.StartProxy(proxyAddr, cfg)
|
||||||
proxyAddr,
|
|
||||||
"http://localhost:177"+suffix+"6",
|
|
||||||
"http://localhost:177"+suffix+"8",
|
|
||||||
"http://"+serverAddr,
|
|
||||||
)
|
|
||||||
|
|
||||||
cmd, cmdChan := test.StartServer(
|
cmd, cmdChan := test.StartServer(
|
||||||
serverType,
|
serverType,
|
||||||
serverArgs,
|
serverArgs,
|
||||||
suffix,
|
|
||||||
configFilename,
|
|
||||||
configFileContents,
|
|
||||||
postgresDatabase,
|
postgresDatabase,
|
||||||
postgresContainerName,
|
postgresContainerName,
|
||||||
databases,
|
databases,
|
||||||
)
|
)
|
||||||
|
|
||||||
fmt.Printf("==TESTSERVER== STARTED %v -> %v : %v\n", proxyAddr, serverAddr, dir)
|
fmt.Printf("==TESTSERVER== STARTED %v -> %v : %v\n", proxyAddr, cfg.Listen.MediaAPI, dir)
|
||||||
return cmd, cmdChan, serverAddr, proxyCmd, proxyCmdChan, proxyAddr, dir
|
return cmd, cmdChan, string(cfg.Listen.MediaAPI), proxyCmd, proxyCmdChan, proxyAddr, dir
|
||||||
}
|
|
||||||
|
|
||||||
func makeConfig(serverAddr, suffix, basePath string, dynamicThumbnails bool) string {
|
|
||||||
return fmt.Sprintf(
|
|
||||||
`
|
|
||||||
server_name: "%s"
|
|
||||||
base_path: %s
|
|
||||||
max_file_size_bytes: %s
|
|
||||||
database: "%s"
|
|
||||||
dynamic_thumbnails: %s
|
|
||||||
%s`,
|
|
||||||
serverAddr,
|
|
||||||
basePath,
|
|
||||||
"10485760",
|
|
||||||
fmt.Sprintf(testDatabaseTemplate, testDatabaseName+suffix),
|
|
||||||
strconv.FormatBool(dynamicThumbnails),
|
|
||||||
thumbnailPregenerationConfig,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func cleanUpServer(cmd *exec.Cmd, dir string) {
|
func cleanUpServer(cmd *exec.Cmd, dir string) {
|
||||||
|
|
|
@ -16,6 +16,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -34,8 +35,6 @@ var (
|
||||||
zookeeperURI = defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181")
|
zookeeperURI = defaulting(os.Getenv("ZOOKEEPER_URI"), "localhost:2181")
|
||||||
// The URI the kafka server is listening on.
|
// The URI the kafka server is listening on.
|
||||||
kafkaURI = defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092")
|
kafkaURI = defaulting(os.Getenv("KAFKA_URIS"), "localhost:9092")
|
||||||
// The address the roomserver should listen on.
|
|
||||||
roomserverAddr = defaulting(os.Getenv("ROOMSERVER_URI"), "localhost:9876")
|
|
||||||
// How long to wait for the roomserver to write the expected output messages.
|
// How long to wait for the roomserver to write the expected output messages.
|
||||||
// This needs to be high enough to account for the time it takes to create
|
// This needs to be high enough to account for the time it takes to create
|
||||||
// the postgres database tables which can take a while on travis.
|
// the postgres database tables which can take a while on travis.
|
||||||
|
@ -164,10 +163,22 @@ func runAndReadFromTopic(runCmd *exec.Cmd, topic string, count int, checkQueryAP
|
||||||
// a api.RoomserverQueryAPI client. The caller can use this function to check the
|
// a api.RoomserverQueryAPI client. The caller can use this function to check the
|
||||||
// behaviour of the query API.
|
// behaviour of the query API.
|
||||||
func testRoomserver(input []string, wantOutput []string, checkQueries func(api.RoomserverQueryAPI)) {
|
func testRoomserver(input []string, wantOutput []string, checkQueries func(api.RoomserverQueryAPI)) {
|
||||||
const (
|
dir, err := ioutil.TempDir("", "room-server-test")
|
||||||
inputTopic = "roomserverInput"
|
if err != nil {
|
||||||
outputTopic = "roomserverOutput"
|
panic(err)
|
||||||
)
|
}
|
||||||
|
|
||||||
|
cfg, _, err := test.MakeConfig(dir, kafkaURI, testDatabase, "localhost", 10000)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if err := test.WriteConfig(cfg, dir); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
inputTopic := string(cfg.Kafka.Topics.InputRoomEvent)
|
||||||
|
outputTopic := string(cfg.Kafka.Topics.OutputRoomEvent)
|
||||||
|
|
||||||
exe.DeleteTopic(inputTopic)
|
exe.DeleteTopic(inputTopic)
|
||||||
if err := exe.CreateTopic(inputTopic); err != nil {
|
if err := exe.CreateTopic(inputTopic); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -181,7 +192,7 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := createDatabase(testDatabaseName); err != nil {
|
if err = createDatabase(testDatabaseName); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,18 +202,11 @@ func testRoomserver(input []string, wantOutput []string, checkQueries func(api.R
|
||||||
// We append to the environment rather than replacing so that any additional
|
// We append to the environment rather than replacing so that any additional
|
||||||
// postgres and golang environment variables such as PGHOST are passed to
|
// postgres and golang environment variables such as PGHOST are passed to
|
||||||
// the roomserver process.
|
// the roomserver process.
|
||||||
cmd.Env = append(
|
|
||||||
os.Environ(),
|
|
||||||
fmt.Sprintf("DATABASE=%s", testDatabase),
|
|
||||||
fmt.Sprintf("KAFKA_URIS=%s", kafkaURI),
|
|
||||||
fmt.Sprintf("TOPIC_INPUT_ROOM_EVENT=%s", inputTopic),
|
|
||||||
fmt.Sprintf("TOPIC_OUTPUT_ROOM_EVENT=%s", outputTopic),
|
|
||||||
fmt.Sprintf("BIND_ADDRESS=%s", roomserverAddr),
|
|
||||||
)
|
|
||||||
cmd.Stderr = os.Stderr
|
cmd.Stderr = os.Stderr
|
||||||
|
cmd.Args = []string{"dendrite-room-server", "--config", filepath.Join(dir, test.ConfigFile)}
|
||||||
|
|
||||||
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, len(wantOutput), func() {
|
gotOutput, err := runAndReadFromTopic(cmd, outputTopic, len(wantOutput), func() {
|
||||||
queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+roomserverAddr, nil)
|
queryAPI := api.NewRoomserverQueryAPIHTTP("http://"+string(cfg.Listen.RoomServer), nil)
|
||||||
checkQueries(queryAPI)
|
checkQueries(queryAPI)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -17,12 +17,14 @@ package main
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/common/test"
|
"github.com/matrix-org/dendrite/common/test"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -62,12 +64,6 @@ var exe = test.KafkaExecutor{
|
||||||
OutputWriter: os.Stderr,
|
OutputWriter: os.Stderr,
|
||||||
}
|
}
|
||||||
|
|
||||||
var syncServerConfigFileContents = (`consumer_uris: ["` + kafkaURI + `"]
|
|
||||||
roomserver_topic: "` + inputTopic + `"
|
|
||||||
database: "` + testDatabase + `"
|
|
||||||
server_name: "localhost"
|
|
||||||
`)
|
|
||||||
|
|
||||||
var timeout time.Duration
|
var timeout time.Duration
|
||||||
var clientEventTestData []string
|
var clientEventTestData []string
|
||||||
|
|
||||||
|
@ -126,11 +122,27 @@ func clientEventJSONForOutputRoomEvent(outputRoomEvent string) string {
|
||||||
// then starts the sync server. The Cmd being executed is returned. A channel is also returned,
|
// then starts the sync server. The Cmd being executed is returned. A channel is also returned,
|
||||||
// which will have any termination errors sent down it, followed immediately by the channel being closed.
|
// which will have any termination errors sent down it, followed immediately by the channel being closed.
|
||||||
func startSyncServer() (*exec.Cmd, chan error) {
|
func startSyncServer() (*exec.Cmd, chan error) {
|
||||||
const configFilename = "sync-api-server-config-test.yaml"
|
|
||||||
|
dir, err := ioutil.TempDir("", "syncapi-server-test")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg, _, err := test.MakeConfig(dir, kafkaURI, testDatabase, "localhost", 10000)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
// TODO use the address assigned by the config generator rather than clobbering.
|
||||||
|
cfg.Matrix.ServerName = "localhost"
|
||||||
|
cfg.Listen.SyncAPI = config.Address(syncserverAddr)
|
||||||
|
cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic)
|
||||||
|
|
||||||
|
if err := test.WriteConfig(cfg, dir); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
serverArgs := []string{
|
serverArgs := []string{
|
||||||
"--config", configFilename,
|
"--config", filepath.Join(dir, test.ConfigFile),
|
||||||
"--listen", syncserverAddr,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
databases := []string{
|
databases := []string{
|
||||||
|
@ -140,9 +152,6 @@ func startSyncServer() (*exec.Cmd, chan error) {
|
||||||
cmd, cmdChan := test.StartServer(
|
cmd, cmdChan := test.StartServer(
|
||||||
"sync-api",
|
"sync-api",
|
||||||
serverArgs,
|
serverArgs,
|
||||||
"",
|
|
||||||
configFilename,
|
|
||||||
syncServerConfigFileContents,
|
|
||||||
postgresDatabase,
|
postgresDatabase,
|
||||||
postgresContainerName,
|
postgresContainerName,
|
||||||
databases,
|
databases,
|
||||||
|
|
|
@ -358,3 +358,12 @@ func fingerprintPEM(data []byte) *gomatrixserverlib.TLSFingerprint {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RoomServerURL returns an HTTP URL for where the roomserver is listening.
|
||||||
|
func (config *Dendrite) RoomServerURL() string {
|
||||||
|
// Hard code the roomserver to talk HTTP for now.
|
||||||
|
// If we support HTTPS we need to think of a practical way to do certificate validation.
|
||||||
|
// People setting up servers shouldn't need to get a certificate valid for the public
|
||||||
|
// internet for an internal API.
|
||||||
|
return "http://" + string(config.Listen.RoomServer)
|
||||||
|
}
|
||||||
|
|
|
@ -1,58 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package common
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/base64"
|
|
||||||
"fmt"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"golang.org/x/crypto/ed25519"
|
|
||||||
"io"
|
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ReadKey reads a server's private ed25519 key.
|
|
||||||
// If the key is the empty string then a random key is generated.
|
|
||||||
// Otherwise the key is the key ID and the base64 encoded private key
|
|
||||||
// separated by a single space character.
|
|
||||||
// E.g "ed25519:abcd ABCDEFGHIJKLMNOPabcdefghijklmnop01234567890"
|
|
||||||
func ReadKey(key string) (gomatrixserverlib.KeyID, ed25519.PrivateKey, error) {
|
|
||||||
var keyID gomatrixserverlib.KeyID
|
|
||||||
var seed io.Reader
|
|
||||||
if key == "" {
|
|
||||||
// TODO: We should fail if we don't have a private key rather than
|
|
||||||
// generating a throw away key.
|
|
||||||
keyID = gomatrixserverlib.KeyID("ed25519:something")
|
|
||||||
} else {
|
|
||||||
// TODO: We should be reading this from a PEM formatted file instead of
|
|
||||||
// reading from the environment directly.
|
|
||||||
parts := strings.SplitN(key, " ", 2)
|
|
||||||
keyID = gomatrixserverlib.KeyID(parts[0])
|
|
||||||
if len(parts) != 2 {
|
|
||||||
return "", nil, fmt.Errorf("Invalid server key: %q", key)
|
|
||||||
}
|
|
||||||
seedBytes, err := base64.RawStdEncoding.DecodeString(parts[1])
|
|
||||||
if err != nil {
|
|
||||||
return "", nil, err
|
|
||||||
}
|
|
||||||
seed = bytes.NewReader(seedBytes)
|
|
||||||
}
|
|
||||||
_, privKey, err := ed25519.GenerateKey(seed)
|
|
||||||
if err != nil {
|
|
||||||
return "", nil, err
|
|
||||||
}
|
|
||||||
return keyID, privKey, nil
|
|
||||||
}
|
|
|
@ -16,11 +16,12 @@ package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Defaulting allows assignment of string variables with a fallback default value
|
// Defaulting allows assignment of string variables with a fallback default value
|
||||||
|
@ -69,7 +70,7 @@ func CreateBackgroundCommand(command string, args []string) (*exec.Cmd, chan err
|
||||||
// which will have any termination errors sent down it, followed immediately by the channel being closed.
|
// which will have any termination errors sent down it, followed immediately by the channel being closed.
|
||||||
// If postgresContainerName is not an empty string, psql will be run from inside that container. If it is
|
// If postgresContainerName is not an empty string, psql will be run from inside that container. If it is
|
||||||
// an empty string, psql will be assumed to be in PATH.
|
// an empty string, psql will be assumed to be in PATH.
|
||||||
func StartServer(serverType string, serverArgs []string, suffix, configFilename, configFileContents, postgresDatabase, postgresContainerName string, databases []string) (*exec.Cmd, chan error) {
|
func StartServer(serverType string, serverArgs []string, postgresDatabase, postgresContainerName string, databases []string) (*exec.Cmd, chan error) {
|
||||||
if len(databases) > 0 {
|
if len(databases) > 0 {
|
||||||
var dbCmd string
|
var dbCmd string
|
||||||
var dbArgs []string
|
var dbArgs []string
|
||||||
|
@ -89,12 +90,6 @@ func StartServer(serverType string, serverArgs []string, suffix, configFilename,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if configFilename != "" {
|
|
||||||
if err := ioutil.WriteFile(configFilename, []byte(configFileContents), 0644); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return CreateBackgroundCommand(
|
return CreateBackgroundCommand(
|
||||||
filepath.Join(filepath.Dir(os.Args[0]), "dendrite-"+serverType+"-server"),
|
filepath.Join(filepath.Dir(os.Args[0]), "dendrite-"+serverType+"-server"),
|
||||||
serverArgs,
|
serverArgs,
|
||||||
|
@ -102,12 +97,12 @@ func StartServer(serverType string, serverArgs []string, suffix, configFilename,
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartProxy creates a reverse proxy
|
// StartProxy creates a reverse proxy
|
||||||
func StartProxy(bindAddr, syncAddr, clientAddr, mediaAddr string) (*exec.Cmd, chan error) {
|
func StartProxy(bindAddr string, cfg *config.Dendrite) (*exec.Cmd, chan error) {
|
||||||
proxyArgs := []string{
|
proxyArgs := []string{
|
||||||
"--bind-address", bindAddr,
|
"--bind-address", bindAddr,
|
||||||
"--sync-api-server-url", syncAddr,
|
"--sync-api-server-url", "http://" + string(cfg.Listen.SyncAPI),
|
||||||
"--client-api-server-url", clientAddr,
|
"--client-api-server-url", "http://" + string(cfg.Listen.ClientAPI),
|
||||||
"--media-api-server-url", mediaAddr,
|
"--media-api-server-url", "http://" + string(cfg.Listen.MediaAPI),
|
||||||
}
|
}
|
||||||
return CreateBackgroundCommand(
|
return CreateBackgroundCommand(
|
||||||
filepath.Join(filepath.Dir(os.Args[0]), "client-api-proxy"),
|
filepath.Join(filepath.Dir(os.Args[0]), "client-api-proxy"),
|
||||||
|
|
|
@ -1,39 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package config
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"golang.org/x/crypto/ed25519"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// FederationAPI contains the config information necessary to spin up a federationapi process.
|
|
||||||
type FederationAPI struct {
|
|
||||||
// The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
|
|
||||||
ServerName gomatrixserverlib.ServerName
|
|
||||||
// The private key which will be used to sign requests.
|
|
||||||
PrivateKey ed25519.PrivateKey
|
|
||||||
// An arbitrary string used to uniquely identify the PrivateKey. Must start with the
|
|
||||||
// prefix "ed25519:".
|
|
||||||
KeyID gomatrixserverlib.KeyID
|
|
||||||
// A list of SHA256 TLS fingerprints for this server.
|
|
||||||
TLSFingerPrints []gomatrixserverlib.TLSFingerprint
|
|
||||||
// How long a remote server can cache our server key for before requesting it again.
|
|
||||||
// Increasing this number will reduce the number of requests made by remote servers
|
|
||||||
// for our key, but increases the period a compromised key will be considered valid
|
|
||||||
// by remote servers.
|
|
||||||
ValidityPeriod time.Duration
|
|
||||||
}
|
|
|
@ -16,7 +16,7 @@ package readers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/matrix-org/dendrite/federationapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"golang.org/x/crypto/ed25519"
|
"golang.org/x/crypto/ed25519"
|
||||||
|
@ -26,29 +26,29 @@ import (
|
||||||
|
|
||||||
// LocalKeys returns the local keys for the server.
|
// LocalKeys returns the local keys for the server.
|
||||||
// See https://matrix.org/docs/spec/server_server/unstable.html#publishing-keys
|
// See https://matrix.org/docs/spec/server_server/unstable.html#publishing-keys
|
||||||
func LocalKeys(req *http.Request, cfg config.FederationAPI) util.JSONResponse {
|
func LocalKeys(req *http.Request, cfg config.Dendrite) util.JSONResponse {
|
||||||
keys, err := localKeys(cfg, time.Now().Add(cfg.ValidityPeriod))
|
keys, err := localKeys(cfg, time.Now().Add(cfg.Matrix.KeyValidityPeriod))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return util.JSONResponse{Code: 200, JSON: keys}
|
return util.JSONResponse{Code: 200, JSON: keys}
|
||||||
}
|
}
|
||||||
|
|
||||||
func localKeys(cfg config.FederationAPI, validUntil time.Time) (*gomatrixserverlib.ServerKeys, error) {
|
func localKeys(cfg config.Dendrite, validUntil time.Time) (*gomatrixserverlib.ServerKeys, error) {
|
||||||
var keys gomatrixserverlib.ServerKeys
|
var keys gomatrixserverlib.ServerKeys
|
||||||
|
|
||||||
keys.ServerName = cfg.ServerName
|
keys.ServerName = cfg.Matrix.ServerName
|
||||||
keys.FromServer = cfg.ServerName
|
keys.FromServer = cfg.Matrix.ServerName
|
||||||
|
|
||||||
publicKey := cfg.PrivateKey.Public().(ed25519.PublicKey)
|
publicKey := cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey)
|
||||||
|
|
||||||
keys.VerifyKeys = map[gomatrixserverlib.KeyID]gomatrixserverlib.VerifyKey{
|
keys.VerifyKeys = map[gomatrixserverlib.KeyID]gomatrixserverlib.VerifyKey{
|
||||||
cfg.KeyID: {
|
cfg.Matrix.KeyID: {
|
||||||
gomatrixserverlib.Base64String(publicKey),
|
gomatrixserverlib.Base64String(publicKey),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
keys.TLSFingerprints = cfg.TLSFingerPrints
|
keys.TLSFingerprints = cfg.Matrix.TLSFingerPrints
|
||||||
keys.OldVerifyKeys = map[gomatrixserverlib.KeyID]gomatrixserverlib.OldVerifyKey{}
|
keys.OldVerifyKeys = map[gomatrixserverlib.KeyID]gomatrixserverlib.OldVerifyKey{}
|
||||||
keys.ValidUntilTS = gomatrixserverlib.AsTimestamp(validUntil)
|
keys.ValidUntilTS = gomatrixserverlib.AsTimestamp(validUntil)
|
||||||
|
|
||||||
|
@ -57,7 +57,9 @@ func localKeys(cfg config.FederationAPI, validUntil time.Time) (*gomatrixserverl
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
keys.Raw, err = gomatrixserverlib.SignJSON(string(cfg.ServerName), cfg.KeyID, cfg.PrivateKey, toSign)
|
keys.Raw, err = gomatrixserverlib.SignJSON(
|
||||||
|
string(cfg.Matrix.ServerName), cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, toSign,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ package routing
|
||||||
import (
|
import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/federationapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/federationapi/readers"
|
"github.com/matrix-org/dendrite/federationapi/readers"
|
||||||
"github.com/matrix-org/dendrite/federationapi/writers"
|
"github.com/matrix-org/dendrite/federationapi/writers"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -36,7 +36,7 @@ const (
|
||||||
// Setup registers HTTP handlers with the given ServeMux.
|
// Setup registers HTTP handlers with the given ServeMux.
|
||||||
func Setup(
|
func Setup(
|
||||||
servMux *http.ServeMux,
|
servMux *http.ServeMux,
|
||||||
cfg config.FederationAPI,
|
cfg config.Dendrite,
|
||||||
query api.RoomserverQueryAPI,
|
query api.RoomserverQueryAPI,
|
||||||
producer *producers.RoomserverProducer,
|
producer *producers.RoomserverProducer,
|
||||||
keys gomatrixserverlib.KeyRing,
|
keys gomatrixserverlib.KeyRing,
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/producers"
|
"github.com/matrix-org/dendrite/clientapi/producers"
|
||||||
"github.com/matrix-org/dendrite/federationapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
@ -19,13 +19,13 @@ func Send(
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
txnID gomatrixserverlib.TransactionID,
|
txnID gomatrixserverlib.TransactionID,
|
||||||
now time.Time,
|
now time.Time,
|
||||||
cfg config.FederationAPI,
|
cfg config.Dendrite,
|
||||||
query api.RoomserverQueryAPI,
|
query api.RoomserverQueryAPI,
|
||||||
producer *producers.RoomserverProducer,
|
producer *producers.RoomserverProducer,
|
||||||
keys gomatrixserverlib.KeyRing,
|
keys gomatrixserverlib.KeyRing,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys)
|
request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.Matrix.ServerName, keys)
|
||||||
if request == nil {
|
if request == nil {
|
||||||
return errResp
|
return errResp
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ func Send(
|
||||||
|
|
||||||
t.Origin = request.Origin()
|
t.Origin = request.Origin()
|
||||||
t.TransactionID = txnID
|
t.TransactionID = txnID
|
||||||
t.Destination = cfg.ServerName
|
t.Destination = cfg.Matrix.ServerName
|
||||||
|
|
||||||
resp, err := t.processTransaction()
|
resp, err := t.processTransaction()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package config
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MediaAPI contains the config information necessary to spin up a mediaapi process.
|
|
||||||
type MediaAPI struct {
|
|
||||||
// The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
|
|
||||||
ServerName gomatrixserverlib.ServerName `yaml:"server_name"`
|
|
||||||
// The base path to where the media files will be stored. May be relative or absolute.
|
|
||||||
BasePath types.Path `yaml:"base_path"`
|
|
||||||
// The absolute base path to where media files will be stored.
|
|
||||||
AbsBasePath types.Path `yaml:"-"`
|
|
||||||
// The maximum file size in bytes that is allowed to be stored on this server.
|
|
||||||
// Note: if MaxFileSizeBytes is set to 0, the size is unlimited.
|
|
||||||
// Note: if max_file_size_bytes is not set, it will default to 10485760 (10MB)
|
|
||||||
MaxFileSizeBytes *types.FileSizeBytes `yaml:"max_file_size_bytes,omitempty"`
|
|
||||||
// The postgres connection config for connecting to the database e.g a postgres:// URI
|
|
||||||
DataSource string `yaml:"database"`
|
|
||||||
// Whether to dynamically generate thumbnails on-the-fly if the requested resolution is not already generated
|
|
||||||
DynamicThumbnails bool `yaml:"dynamic_thumbnails"`
|
|
||||||
// The maximum number of simultaneous thumbnail generators. default: 10
|
|
||||||
MaxThumbnailGenerators int `yaml:"max_thumbnail_generators"`
|
|
||||||
// A list of thumbnail sizes to be pre-generated for downloaded remote / uploaded content
|
|
||||||
ThumbnailSizes []types.ThumbnailSize `yaml:"thumbnail_sizes"`
|
|
||||||
}
|
|
|
@ -26,13 +26,14 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetPathFromBase64Hash evaluates the path to a media file from its Base64Hash
|
// GetPathFromBase64Hash evaluates the path to a media file from its Base64Hash
|
||||||
// 3 subdirectories are created for more manageable browsing and use the remainder as the file name.
|
// 3 subdirectories are created for more manageable browsing and use the remainder as the file name.
|
||||||
// For example, if Base64Hash is 'qwerty', the path will be 'q/w/erty/file'.
|
// For example, if Base64Hash is 'qwerty', the path will be 'q/w/erty/file'.
|
||||||
func GetPathFromBase64Hash(base64Hash types.Base64Hash, absBasePath types.Path) (string, error) {
|
func GetPathFromBase64Hash(base64Hash types.Base64Hash, absBasePath config.Path) (string, error) {
|
||||||
if len(base64Hash) < 3 {
|
if len(base64Hash) < 3 {
|
||||||
return "", fmt.Errorf("Invalid filePath (Base64Hash too short - min 3 characters): %q", base64Hash)
|
return "", fmt.Errorf("Invalid filePath (Base64Hash too short - min 3 characters): %q", base64Hash)
|
||||||
}
|
}
|
||||||
|
@ -66,7 +67,7 @@ func GetPathFromBase64Hash(base64Hash types.Base64Hash, absBasePath types.Path)
|
||||||
// If the final path exists and the file size matches, the file does not need to be moved.
|
// If the final path exists and the file size matches, the file does not need to be moved.
|
||||||
// In error cases where the file is not a duplicate, the caller may decide to remove the final path.
|
// In error cases where the file is not a duplicate, the caller may decide to remove the final path.
|
||||||
// Returns the final path of the file, whether it is a duplicate and an error.
|
// Returns the final path of the file, whether it is a duplicate and an error.
|
||||||
func MoveFileWithHashCheck(tmpDir types.Path, mediaMetadata *types.MediaMetadata, absBasePath types.Path, logger *log.Entry) (types.Path, bool, error) {
|
func MoveFileWithHashCheck(tmpDir types.Path, mediaMetadata *types.MediaMetadata, absBasePath config.Path, logger *log.Entry) (types.Path, bool, error) {
|
||||||
// Note: in all error and success cases, we need to remove the temporary directory
|
// Note: in all error and success cases, we need to remove the temporary directory
|
||||||
defer RemoveDir(tmpDir, logger)
|
defer RemoveDir(tmpDir, logger)
|
||||||
duplicate := false
|
duplicate := false
|
||||||
|
@ -104,7 +105,7 @@ func RemoveDir(dir types.Path, logger *log.Entry) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteTempFile writes to a new temporary file
|
// WriteTempFile writes to a new temporary file
|
||||||
func WriteTempFile(reqReader io.Reader, maxFileSizeBytes types.FileSizeBytes, absBasePath types.Path) (types.Base64Hash, types.FileSizeBytes, types.Path, error) {
|
func WriteTempFile(reqReader io.Reader, maxFileSizeBytes config.FileSizeBytes, absBasePath config.Path) (types.Base64Hash, types.FileSizeBytes, types.Path, error) {
|
||||||
tmpFileWriter, tmpFile, tmpDir, err := createTempFileWriter(absBasePath)
|
tmpFileWriter, tmpFile, tmpDir, err := createTempFileWriter(absBasePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", -1, "", err
|
return "", -1, "", err
|
||||||
|
@ -144,7 +145,7 @@ func moveFile(src types.Path, dst types.Path) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createTempFileWriter(absBasePath types.Path) (*bufio.Writer, *os.File, types.Path, error) {
|
func createTempFileWriter(absBasePath config.Path) (*bufio.Writer, *os.File, types.Path, error) {
|
||||||
tmpDir, err := createTempDir(absBasePath)
|
tmpDir, err := createTempDir(absBasePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, "", fmt.Errorf("Failed to create temp dir: %q", err)
|
return nil, nil, "", fmt.Errorf("Failed to create temp dir: %q", err)
|
||||||
|
@ -157,7 +158,7 @@ func createTempFileWriter(absBasePath types.Path) (*bufio.Writer, *os.File, type
|
||||||
}
|
}
|
||||||
|
|
||||||
// createTempDir creates a tmp/<random string> directory within baseDirectory and returns its path
|
// createTempDir creates a tmp/<random string> directory within baseDirectory and returns its path
|
||||||
func createTempDir(baseDirectory types.Path) (types.Path, error) {
|
func createTempDir(baseDirectory config.Path) (types.Path, error) {
|
||||||
baseTmpDir := filepath.Join(string(baseDirectory), "tmp")
|
baseTmpDir := filepath.Join(string(baseDirectory), "tmp")
|
||||||
if err := os.MkdirAll(baseTmpDir, 0770); err != nil {
|
if err := os.MkdirAll(baseTmpDir, 0770); err != nil {
|
||||||
return "", fmt.Errorf("Failed to create base temp dir: %v", err)
|
return "", fmt.Errorf("Failed to create base temp dir: %v", err)
|
||||||
|
|
|
@ -19,7 +19,7 @@ import (
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/writers"
|
"github.com/matrix-org/dendrite/mediaapi/writers"
|
||||||
|
@ -32,7 +32,7 @@ const pathPrefixR0 = "/_matrix/media/v1"
|
||||||
|
|
||||||
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
|
||||||
// to clients which need to make outbound HTTP requests.
|
// to clients which need to make outbound HTTP requests.
|
||||||
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.MediaAPI, db *storage.Database) {
|
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.Dendrite, db *storage.Database) {
|
||||||
apiMux := mux.NewRouter()
|
apiMux := mux.NewRouter()
|
||||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.MediaAPI
|
||||||
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeDownloadAPI(name string, cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration) http.HandlerFunc {
|
func makeDownloadAPI(name string, cfg *config.Dendrite, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration) http.HandlerFunc {
|
||||||
return prometheus.InstrumentHandler(name, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
return prometheus.InstrumentHandler(name, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
req = util.RequestWithLogging(req)
|
req = util.RequestWithLogging(req)
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||||
)
|
)
|
||||||
|
@ -56,7 +57,7 @@ func GetThumbnailPath(src types.Path, config types.ThumbnailSize) types.Path {
|
||||||
// * has a small file size
|
// * has a small file size
|
||||||
// If a pre-generated thumbnail size is the best match, but it has not been generated yet, the caller can use the returned size to generate it.
|
// If a pre-generated thumbnail size is the best match, but it has not been generated yet, the caller can use the returned size to generate it.
|
||||||
// Returns nil if no thumbnail matches the criteria
|
// Returns nil if no thumbnail matches the criteria
|
||||||
func SelectThumbnail(desired types.ThumbnailSize, thumbnails []*types.ThumbnailMetadata, thumbnailSizes []types.ThumbnailSize) (*types.ThumbnailMetadata, *types.ThumbnailSize) {
|
func SelectThumbnail(desired types.ThumbnailSize, thumbnails []*types.ThumbnailMetadata, thumbnailSizes []config.ThumbnailSize) (*types.ThumbnailMetadata, *types.ThumbnailSize) {
|
||||||
var chosenThumbnail *types.ThumbnailMetadata
|
var chosenThumbnail *types.ThumbnailMetadata
|
||||||
var chosenThumbnailSize *types.ThumbnailSize
|
var chosenThumbnailSize *types.ThumbnailSize
|
||||||
bestFit := newThumbnailFitness()
|
bestFit := newThumbnailFitness()
|
||||||
|
@ -76,7 +77,7 @@ func SelectThumbnail(desired types.ThumbnailSize, thumbnails []*types.ThumbnailM
|
||||||
if desired.ResizeMethod == "scale" && thumbnailSize.ResizeMethod != "scale" {
|
if desired.ResizeMethod == "scale" && thumbnailSize.ResizeMethod != "scale" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fitness := calcThumbnailFitness(thumbnailSize, nil, desired)
|
fitness := calcThumbnailFitness(types.ThumbnailSize(thumbnailSize), nil, desired)
|
||||||
if isBetter := fitness.betterThan(bestFit, desired.ResizeMethod == "crop"); isBetter {
|
if isBetter := fitness.betterThan(bestFit, desired.ResizeMethod == "crop"); isBetter {
|
||||||
bestFit = fitness
|
bestFit = fitness
|
||||||
chosenThumbnailSize = &types.ThumbnailSize{
|
chosenThumbnailSize = &types.ThumbnailSize{
|
||||||
|
|
|
@ -21,13 +21,14 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||||
"gopkg.in/h2non/bimg.v1"
|
"gopkg.in/h2non/bimg.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GenerateThumbnails generates the configured thumbnail sizes for the source file
|
// GenerateThumbnails generates the configured thumbnail sizes for the source file
|
||||||
func GenerateThumbnails(src types.Path, configs []types.ThumbnailSize, mediaMetadata *types.MediaMetadata, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, logger *log.Entry) (busy bool, errorReturn error) {
|
func GenerateThumbnails(src types.Path, configs []config.ThumbnailSize, mediaMetadata *types.MediaMetadata, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, logger *log.Entry) (busy bool, errorReturn error) {
|
||||||
buffer, err := bimg.Read(string(src))
|
buffer, err := bimg.Read(string(src))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("src", src).Error("Failed to read src file")
|
logger.WithError(err).WithField("src", src).Error("Failed to read src file")
|
||||||
|
|
|
@ -28,13 +28,14 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/types"
|
"github.com/matrix-org/dendrite/mediaapi/types"
|
||||||
"github.com/nfnt/resize"
|
"github.com/nfnt/resize"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GenerateThumbnails generates the configured thumbnail sizes for the source file
|
// GenerateThumbnails generates the configured thumbnail sizes for the source file
|
||||||
func GenerateThumbnails(src types.Path, configs []types.ThumbnailSize, mediaMetadata *types.MediaMetadata, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, logger *log.Entry) (busy bool, errorReturn error) {
|
func GenerateThumbnails(src types.Path, configs []config.ThumbnailSize, mediaMetadata *types.MediaMetadata, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, logger *log.Entry) (busy bool, errorReturn error) {
|
||||||
img, err := readFile(string(src))
|
img, err := readFile(string(src))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("src", src).Error("Failed to read src file")
|
logger.WithError(err).WithField("src", src).Error("Failed to read src file")
|
||||||
|
@ -42,7 +43,7 @@ func GenerateThumbnails(src types.Path, configs []types.ThumbnailSize, mediaMeta
|
||||||
}
|
}
|
||||||
for _, config := range configs {
|
for _, config := range configs {
|
||||||
// Note: createThumbnail does locking based on activeThumbnailGeneration
|
// Note: createThumbnail does locking based on activeThumbnailGeneration
|
||||||
busy, err = createThumbnail(src, img, config, mediaMetadata, activeThumbnailGeneration, maxThumbnailGenerators, db, logger)
|
busy, err = createThumbnail(src, img, types.ThumbnailSize(config), mediaMetadata, activeThumbnailGeneration, maxThumbnailGenerators, db, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("src", src).Error("Failed to generate thumbnails")
|
logger.WithError(err).WithField("src", src).Error("Failed to generate thumbnails")
|
||||||
return false, err
|
return false, err
|
||||||
|
|
|
@ -17,6 +17,7 @@ package types
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
@ -79,16 +80,7 @@ type ActiveRemoteRequests struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ThumbnailSize contains a single thumbnail size configuration
|
// ThumbnailSize contains a single thumbnail size configuration
|
||||||
type ThumbnailSize struct {
|
type ThumbnailSize config.ThumbnailSize
|
||||||
// Maximum width of the thumbnail image
|
|
||||||
Width int `yaml:"width"`
|
|
||||||
// Maximum height of the thumbnail image
|
|
||||||
Height int `yaml:"height"`
|
|
||||||
// ResizeMethod is one of crop or scale.
|
|
||||||
// crop scales to fill the requested dimensions and crops the excess.
|
|
||||||
// scale scales to fit the requested dimensions and one dimension may be smaller than requested.
|
|
||||||
ResizeMethod string `yaml:"method,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ThumbnailMetadata contains the metadata about an individual thumbnail
|
// ThumbnailMetadata contains the metadata about an individual thumbnail
|
||||||
type ThumbnailMetadata struct {
|
type ThumbnailMetadata struct {
|
||||||
|
|
|
@ -29,7 +29,7 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/fileutils"
|
"github.com/matrix-org/dendrite/mediaapi/fileutils"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/thumbnailer"
|
"github.com/matrix-org/dendrite/mediaapi/thumbnailer"
|
||||||
|
@ -59,7 +59,7 @@ type downloadRequest struct {
|
||||||
// If they are present in the cache, they are served directly.
|
// If they are present in the cache, they are served directly.
|
||||||
// If they are not present in the cache, they are obtained from the remote server and
|
// If they are not present in the cache, they are obtained from the remote server and
|
||||||
// simultaneously served back to the client and written into the cache.
|
// simultaneously served back to the client and written into the cache.
|
||||||
func Download(w http.ResponseWriter, req *http.Request, origin gomatrixserverlib.ServerName, mediaID types.MediaID, cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration, isThumbnailRequest bool) {
|
func Download(w http.ResponseWriter, req *http.Request, origin gomatrixserverlib.ServerName, mediaID types.MediaID, cfg *config.Dendrite, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration, isThumbnailRequest bool) {
|
||||||
r := &downloadRequest{
|
r := &downloadRequest{
|
||||||
MediaMetadata: &types.MediaMetadata{
|
MediaMetadata: &types.MediaMetadata{
|
||||||
MediaID: mediaID,
|
MediaID: mediaID,
|
||||||
|
@ -167,7 +167,7 @@ func (r *downloadRequest) Validate() *util.JSONResponse {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration) *util.JSONResponse {
|
func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.Dendrite, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration) *util.JSONResponse {
|
||||||
// check if we have a record of the media in our database
|
// check if we have a record of the media in our database
|
||||||
mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin)
|
mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -176,7 +176,7 @@ func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI
|
||||||
return &resErr
|
return &resErr
|
||||||
}
|
}
|
||||||
if mediaMetadata == nil {
|
if mediaMetadata == nil {
|
||||||
if r.MediaMetadata.Origin == cfg.ServerName {
|
if r.MediaMetadata.Origin == cfg.Matrix.ServerName {
|
||||||
// If we do not have a record and the origin is local, the file is not found
|
// If we do not have a record and the origin is local, the file is not found
|
||||||
return &util.JSONResponse{
|
return &util.JSONResponse{
|
||||||
Code: 404,
|
Code: 404,
|
||||||
|
@ -192,12 +192,12 @@ func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI
|
||||||
// If we have a record, we can respond from the local file
|
// If we have a record, we can respond from the local file
|
||||||
r.MediaMetadata = mediaMetadata
|
r.MediaMetadata = mediaMetadata
|
||||||
}
|
}
|
||||||
return r.respondFromLocalFile(w, cfg.AbsBasePath, activeThumbnailGeneration, cfg.MaxThumbnailGenerators, db, cfg.DynamicThumbnails, cfg.ThumbnailSizes)
|
return r.respondFromLocalFile(w, cfg.Media.AbsBasePath, activeThumbnailGeneration, cfg.Media.MaxThumbnailGenerators, db, cfg.Media.DynamicThumbnails, cfg.Media.ThumbnailSizes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// respondFromLocalFile reads a file from local storage and writes it to the http.ResponseWriter
|
// respondFromLocalFile reads a file from local storage and writes it to the http.ResponseWriter
|
||||||
// Returns a util.JSONResponse error in case of error
|
// Returns a util.JSONResponse error in case of error
|
||||||
func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePath types.Path, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, dynamicThumbnails bool, thumbnailSizes []types.ThumbnailSize) *util.JSONResponse {
|
func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePath config.Path, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, dynamicThumbnails bool, thumbnailSizes []config.ThumbnailSize) *util.JSONResponse {
|
||||||
filePath, err := fileutils.GetPathFromBase64Hash(r.MediaMetadata.Base64Hash, absBasePath)
|
filePath, err := fileutils.GetPathFromBase64Hash(r.MediaMetadata.Base64Hash, absBasePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.WithError(err).Error("Failed to get file path from metadata")
|
r.Logger.WithError(err).Error("Failed to get file path from metadata")
|
||||||
|
@ -284,7 +284,7 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: Thumbnail generation may be ongoing asynchronously.
|
// Note: Thumbnail generation may be ongoing asynchronously.
|
||||||
func (r *downloadRequest) getThumbnailFile(filePath types.Path, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, dynamicThumbnails bool, thumbnailSizes []types.ThumbnailSize) (*os.File, *types.ThumbnailMetadata, *util.JSONResponse) {
|
func (r *downloadRequest) getThumbnailFile(filePath types.Path, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int, db *storage.Database, dynamicThumbnails bool, thumbnailSizes []config.ThumbnailSize) (*os.File, *types.ThumbnailMetadata, *util.JSONResponse) {
|
||||||
var thumbnail *types.ThumbnailMetadata
|
var thumbnail *types.ThumbnailMetadata
|
||||||
var resErr *util.JSONResponse
|
var resErr *util.JSONResponse
|
||||||
|
|
||||||
|
@ -383,7 +383,7 @@ func (r *downloadRequest) generateThumbnail(filePath types.Path, thumbnailSize t
|
||||||
// regardless of how many download requests are received.
|
// regardless of how many download requests are received.
|
||||||
// Note: The named errorResponse return variable is used in a deferred broadcast of the metadata and error response to waiting goroutines.
|
// Note: The named errorResponse return variable is used in a deferred broadcast of the metadata and error response to waiting goroutines.
|
||||||
// Returns a util.JSONResponse error in case of error
|
// Returns a util.JSONResponse error in case of error
|
||||||
func (r *downloadRequest) getRemoteFile(cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration) (errorResponse *util.JSONResponse) {
|
func (r *downloadRequest) getRemoteFile(cfg *config.Dendrite, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests, activeThumbnailGeneration *types.ActiveThumbnailGeneration) (errorResponse *util.JSONResponse) {
|
||||||
// Note: getMediaMetadataFromActiveRequest uses mutexes and conditions from activeRemoteRequests
|
// Note: getMediaMetadataFromActiveRequest uses mutexes and conditions from activeRemoteRequests
|
||||||
mediaMetadata, resErr := r.getMediaMetadataFromActiveRequest(activeRemoteRequests)
|
mediaMetadata, resErr := r.getMediaMetadataFromActiveRequest(activeRemoteRequests)
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
|
@ -414,7 +414,7 @@ func (r *downloadRequest) getRemoteFile(cfg *config.MediaAPI, db *storage.Databa
|
||||||
|
|
||||||
if mediaMetadata == nil {
|
if mediaMetadata == nil {
|
||||||
// If we do not have a record, we need to fetch the remote file first and then respond from the local file
|
// If we do not have a record, we need to fetch the remote file first and then respond from the local file
|
||||||
resErr := r.fetchRemoteFileAndStoreMetadata(cfg.AbsBasePath, *cfg.MaxFileSizeBytes, db, cfg.ThumbnailSizes, activeThumbnailGeneration, cfg.MaxThumbnailGenerators)
|
resErr := r.fetchRemoteFileAndStoreMetadata(cfg.Media.AbsBasePath, *cfg.Media.MaxFileSizeBytes, db, cfg.Media.ThumbnailSizes, activeThumbnailGeneration, cfg.Media.MaxThumbnailGenerators)
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
return resErr
|
return resErr
|
||||||
}
|
}
|
||||||
|
@ -476,7 +476,7 @@ func (r *downloadRequest) broadcastMediaMetadata(activeRemoteRequests *types.Act
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchRemoteFileAndStoreMetadata fetches the file from the remote server and stores its metadata in the database
|
// fetchRemoteFileAndStoreMetadata fetches the file from the remote server and stores its metadata in the database
|
||||||
func (r *downloadRequest) fetchRemoteFileAndStoreMetadata(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes, db *storage.Database, thumbnailSizes []types.ThumbnailSize, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int) *util.JSONResponse {
|
func (r *downloadRequest) fetchRemoteFileAndStoreMetadata(absBasePath config.Path, maxFileSizeBytes config.FileSizeBytes, db *storage.Database, thumbnailSizes []config.ThumbnailSize, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int) *util.JSONResponse {
|
||||||
finalPath, duplicate, resErr := r.fetchRemoteFile(absBasePath, maxFileSizeBytes)
|
finalPath, duplicate, resErr := r.fetchRemoteFile(absBasePath, maxFileSizeBytes)
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
return resErr
|
return resErr
|
||||||
|
@ -524,7 +524,7 @@ func (r *downloadRequest) fetchRemoteFileAndStoreMetadata(absBasePath types.Path
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *downloadRequest) fetchRemoteFile(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes) (types.Path, bool, *util.JSONResponse) {
|
func (r *downloadRequest) fetchRemoteFile(absBasePath config.Path, maxFileSizeBytes config.FileSizeBytes) (types.Path, bool, *util.JSONResponse) {
|
||||||
r.Logger.Info("Fetching remote file")
|
r.Logger.Info("Fetching remote file")
|
||||||
|
|
||||||
// create request for remote file
|
// create request for remote file
|
||||||
|
|
|
@ -24,7 +24,7 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/fileutils"
|
"github.com/matrix-org/dendrite/mediaapi/fileutils"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/storage"
|
"github.com/matrix-org/dendrite/mediaapi/storage"
|
||||||
"github.com/matrix-org/dendrite/mediaapi/thumbnailer"
|
"github.com/matrix-org/dendrite/mediaapi/thumbnailer"
|
||||||
|
@ -51,7 +51,7 @@ type uploadResponse struct {
|
||||||
// This implementation supports a configurable maximum file size limit in bytes. If a user tries to upload more than this, they will receive an error that their upload is too large.
|
// This implementation supports a configurable maximum file size limit in bytes. If a user tries to upload more than this, they will receive an error that their upload is too large.
|
||||||
// Uploaded files are processed piece-wise to avoid DoS attacks which would starve the server of memory.
|
// Uploaded files are processed piece-wise to avoid DoS attacks which would starve the server of memory.
|
||||||
// TODO: We should time out requests if they have not received any data within a configured timeout period.
|
// TODO: We should time out requests if they have not received any data within a configured timeout period.
|
||||||
func Upload(req *http.Request, cfg *config.MediaAPI, db *storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) util.JSONResponse {
|
func Upload(req *http.Request, cfg *config.Dendrite, db *storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) util.JSONResponse {
|
||||||
r, resErr := parseAndValidateRequest(req, cfg)
|
r, resErr := parseAndValidateRequest(req, cfg)
|
||||||
if resErr != nil {
|
if resErr != nil {
|
||||||
return *resErr
|
return *resErr
|
||||||
|
@ -64,7 +64,7 @@ func Upload(req *http.Request, cfg *config.MediaAPI, db *storage.Database, activ
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: uploadResponse{
|
JSON: uploadResponse{
|
||||||
ContentURI: fmt.Sprintf("mxc://%s/%s", cfg.ServerName, r.MediaMetadata.MediaID),
|
ContentURI: fmt.Sprintf("mxc://%s/%s", cfg.Matrix.ServerName, r.MediaMetadata.MediaID),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ func Upload(req *http.Request, cfg *config.MediaAPI, db *storage.Database, activ
|
||||||
// parseAndValidateRequest parses the incoming upload request to validate and extract
|
// parseAndValidateRequest parses the incoming upload request to validate and extract
|
||||||
// all the metadata about the media being uploaded.
|
// all the metadata about the media being uploaded.
|
||||||
// Returns either an uploadRequest or an error formatted as a util.JSONResponse
|
// Returns either an uploadRequest or an error formatted as a util.JSONResponse
|
||||||
func parseAndValidateRequest(req *http.Request, cfg *config.MediaAPI) (*uploadRequest, *util.JSONResponse) {
|
func parseAndValidateRequest(req *http.Request, cfg *config.Dendrite) (*uploadRequest, *util.JSONResponse) {
|
||||||
if req.Method != "POST" {
|
if req.Method != "POST" {
|
||||||
return nil, &util.JSONResponse{
|
return nil, &util.JSONResponse{
|
||||||
Code: 405,
|
Code: 405,
|
||||||
|
@ -82,22 +82,22 @@ func parseAndValidateRequest(req *http.Request, cfg *config.MediaAPI) (*uploadRe
|
||||||
|
|
||||||
r := &uploadRequest{
|
r := &uploadRequest{
|
||||||
MediaMetadata: &types.MediaMetadata{
|
MediaMetadata: &types.MediaMetadata{
|
||||||
Origin: cfg.ServerName,
|
Origin: cfg.Matrix.ServerName,
|
||||||
FileSizeBytes: types.FileSizeBytes(req.ContentLength),
|
FileSizeBytes: types.FileSizeBytes(req.ContentLength),
|
||||||
ContentType: types.ContentType(req.Header.Get("Content-Type")),
|
ContentType: types.ContentType(req.Header.Get("Content-Type")),
|
||||||
UploadName: types.Filename(url.PathEscape(req.FormValue("filename"))),
|
UploadName: types.Filename(url.PathEscape(req.FormValue("filename"))),
|
||||||
},
|
},
|
||||||
Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.ServerName),
|
Logger: util.GetLogger(req.Context()).WithField("Origin", cfg.Matrix.ServerName),
|
||||||
}
|
}
|
||||||
|
|
||||||
if resErr := r.Validate(*cfg.MaxFileSizeBytes); resErr != nil {
|
if resErr := r.Validate(*cfg.Media.MaxFileSizeBytes); resErr != nil {
|
||||||
return nil, resErr
|
return nil, resErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *uploadRequest) doUpload(reqReader io.Reader, cfg *config.MediaAPI, db *storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) *util.JSONResponse {
|
func (r *uploadRequest) doUpload(reqReader io.Reader, cfg *config.Dendrite, db *storage.Database, activeThumbnailGeneration *types.ActiveThumbnailGeneration) *util.JSONResponse {
|
||||||
r.Logger.WithFields(log.Fields{
|
r.Logger.WithFields(log.Fields{
|
||||||
"UploadName": r.MediaMetadata.UploadName,
|
"UploadName": r.MediaMetadata.UploadName,
|
||||||
"FileSizeBytes": r.MediaMetadata.FileSizeBytes,
|
"FileSizeBytes": r.MediaMetadata.FileSizeBytes,
|
||||||
|
@ -108,10 +108,10 @@ func (r *uploadRequest) doUpload(reqReader io.Reader, cfg *config.MediaAPI, db *
|
||||||
// method of deduplicating files to save storage, as well as a way to conduct
|
// method of deduplicating files to save storage, as well as a way to conduct
|
||||||
// integrity checks on the file data in the repository.
|
// integrity checks on the file data in the repository.
|
||||||
// Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK.
|
// Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK.
|
||||||
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(reqReader, *cfg.MaxFileSizeBytes, cfg.AbsBasePath)
|
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(reqReader, *cfg.Media.MaxFileSizeBytes, cfg.Media.AbsBasePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.WithError(err).WithFields(log.Fields{
|
r.Logger.WithError(err).WithFields(log.Fields{
|
||||||
"MaxFileSizeBytes": *cfg.MaxFileSizeBytes,
|
"MaxFileSizeBytes": *cfg.Media.MaxFileSizeBytes,
|
||||||
}).Warn("Error while transferring file")
|
}).Warn("Error while transferring file")
|
||||||
fileutils.RemoveDir(tmpDir, r.Logger)
|
fileutils.RemoveDir(tmpDir, r.Logger)
|
||||||
return &util.JSONResponse{
|
return &util.JSONResponse{
|
||||||
|
@ -147,12 +147,12 @@ func (r *uploadRequest) doUpload(reqReader io.Reader, cfg *config.MediaAPI, db *
|
||||||
return &util.JSONResponse{
|
return &util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: uploadResponse{
|
JSON: uploadResponse{
|
||||||
ContentURI: fmt.Sprintf("mxc://%s/%s", cfg.ServerName, r.MediaMetadata.MediaID),
|
ContentURI: fmt.Sprintf("mxc://%s/%s", cfg.Matrix.ServerName, r.MediaMetadata.MediaID),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if resErr := r.storeFileAndMetadata(tmpDir, cfg.AbsBasePath, db, cfg.ThumbnailSizes, activeThumbnailGeneration, cfg.MaxThumbnailGenerators); resErr != nil {
|
if resErr := r.storeFileAndMetadata(tmpDir, cfg.Media.AbsBasePath, db, cfg.Media.ThumbnailSizes, activeThumbnailGeneration, cfg.Media.MaxThumbnailGenerators); resErr != nil {
|
||||||
return resErr
|
return resErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,14 +160,14 @@ func (r *uploadRequest) doUpload(reqReader io.Reader, cfg *config.MediaAPI, db *
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate validates the uploadRequest fields
|
// Validate validates the uploadRequest fields
|
||||||
func (r *uploadRequest) Validate(maxFileSizeBytes types.FileSizeBytes) *util.JSONResponse {
|
func (r *uploadRequest) Validate(maxFileSizeBytes config.FileSizeBytes) *util.JSONResponse {
|
||||||
if r.MediaMetadata.FileSizeBytes < 1 {
|
if r.MediaMetadata.FileSizeBytes < 1 {
|
||||||
return &util.JSONResponse{
|
return &util.JSONResponse{
|
||||||
Code: 411,
|
Code: 411,
|
||||||
JSON: jsonerror.Unknown("HTTP Content-Length request header must be greater than zero."),
|
JSON: jsonerror.Unknown("HTTP Content-Length request header must be greater than zero."),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if maxFileSizeBytes > 0 && r.MediaMetadata.FileSizeBytes > maxFileSizeBytes {
|
if maxFileSizeBytes > 0 && r.MediaMetadata.FileSizeBytes > types.FileSizeBytes(maxFileSizeBytes) {
|
||||||
return &util.JSONResponse{
|
return &util.JSONResponse{
|
||||||
Code: 413,
|
Code: 413,
|
||||||
JSON: jsonerror.Unknown(fmt.Sprintf("HTTP Content-Length is greater than the maximum allowed upload size (%v).", maxFileSizeBytes)),
|
JSON: jsonerror.Unknown(fmt.Sprintf("HTTP Content-Length is greater than the maximum allowed upload size (%v).", maxFileSizeBytes)),
|
||||||
|
@ -215,7 +215,7 @@ func (r *uploadRequest) Validate(maxFileSizeBytes types.FileSizeBytes) *util.JSO
|
||||||
// The order of operations is important as it avoids metadata entering the database before the file
|
// The order of operations is important as it avoids metadata entering the database before the file
|
||||||
// is ready, and if we fail to move the file, it never gets added to the database.
|
// is ready, and if we fail to move the file, it never gets added to the database.
|
||||||
// Returns a util.JSONResponse error and cleans up directories in case of error.
|
// Returns a util.JSONResponse error and cleans up directories in case of error.
|
||||||
func (r *uploadRequest) storeFileAndMetadata(tmpDir types.Path, absBasePath types.Path, db *storage.Database, thumbnailSizes []types.ThumbnailSize, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int) *util.JSONResponse {
|
func (r *uploadRequest) storeFileAndMetadata(tmpDir types.Path, absBasePath config.Path, db *storage.Database, thumbnailSizes []config.ThumbnailSize, activeThumbnailGeneration *types.ActiveThumbnailGeneration, maxThumbnailGenerators int) *util.JSONResponse {
|
||||||
finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger)
|
finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.WithError(err).Error("Failed to move file.")
|
r.Logger.WithError(err).Error("Failed to move file.")
|
||||||
|
|
|
@ -1,33 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package config
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Sync contains the config information necessary to spin up a sync-server process.
|
|
||||||
type Sync struct {
|
|
||||||
// Where the room server is listening for queries.
|
|
||||||
RoomserverURL string `yaml:"roomserver_url"`
|
|
||||||
// The topic for events which are written by the room server output log.
|
|
||||||
RoomserverOutputTopic string `yaml:"roomserver_topic"`
|
|
||||||
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server.
|
|
||||||
KafkaConsumerURIs []string `yaml:"consumer_uris"`
|
|
||||||
// The postgres connection config for connecting to the database e.g a postgres:// URI
|
|
||||||
DataSource string `yaml:"database"`
|
|
||||||
// The server_name of the running process e.g "localhost"
|
|
||||||
ServerName gomatrixserverlib.ServerName `yaml:"server_name"`
|
|
||||||
}
|
|
|
@ -20,8 +20,8 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
@ -38,14 +38,15 @@ type OutputRoomEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
roomServerURL := cfg.RoomServerURL()
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: cfg.RoomserverOutputTopic,
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
@ -53,7 +54,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil),
|
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -30,7 +29,7 @@ import (
|
||||||
const pathPrefixR0 = "/_matrix/client/r0"
|
const pathPrefixR0 = "/_matrix/client/r0"
|
||||||
|
|
||||||
// SetupSyncServerListeners configures the given mux with sync-server listeners
|
// SetupSyncServerListeners configures the given mux with sync-server listeners
|
||||||
func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp *sync.RequestPool, deviceDB *devices.Database) {
|
func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, srp *sync.RequestPool, deviceDB *devices.Database) {
|
||||||
apiMux := mux.NewRouter()
|
apiMux := mux.NewRouter()
|
||||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||||
r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
|
Loading…
Reference in New Issue