Cross-signing groundwork (#1953)
* Cross-signing groundwork * Update to matrix-org/gomatrixserverlib#274 * Fix gobind builds, which stops unit tests in CI from yelling * Some changes from review comments * Fix build by passing in UIA * Update to matrix-org/gomatrixserverlib@bec8d22 * Process master/self-signing keys from devices call * nolint * Enum-ify the key type in the database * Process self-signing key too * Fix sanity check in device list updater * Fix check * Fix sytest, hopefully * Fix buildmain
parent
4cc8b28b7f
commit
eb0efa4636
|
@ -299,7 +299,7 @@ func (m *DendriteMonolith) Start() {
|
|||
base, federation, rsAPI, keyRing, true,
|
||||
)
|
||||
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||
m.userAPI = userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
||||
keyAPI.SetUserAPI(m.userAPI)
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ func (m *DendriteMonolith) Start() {
|
|||
base, federation, rsAPI, keyRing, true,
|
||||
)
|
||||
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login,
|
|||
if username == "" {
|
||||
return nil, &util.JSONResponse{
|
||||
Code: http.StatusUnauthorized,
|
||||
JSON: jsonerror.BadJSON("'user' must be supplied."),
|
||||
JSON: jsonerror.BadJSON("A username must be supplied."),
|
||||
}
|
||||
}
|
||||
localpart, err := userutil.ParseUsernameParam(username, &t.Config.Matrix.ServerName)
|
||||
|
@ -68,7 +68,7 @@ func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login,
|
|||
// but that would leak the existence of the user.
|
||||
return nil, &util.JSONResponse{
|
||||
Code: http.StatusForbidden,
|
||||
JSON: jsonerror.Forbidden("username or password was incorrect, or the account does not exist"),
|
||||
JSON: jsonerror.Forbidden("The username or password was incorrect or the account does not exist."),
|
||||
}
|
||||
}
|
||||
return &r.Login, nil
|
||||
|
|
|
@ -220,7 +220,7 @@ func (u *UserInteractive) Verify(ctx context.Context, bodyBytes []byte, device *
|
|||
if !ok {
|
||||
return nil, &util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.BadJSON("unknown auth.type: " + authType),
|
||||
JSON: jsonerror.BadJSON("Unknown auth.type: " + authType),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,7 +231,7 @@ func (u *UserInteractive) Verify(ctx context.Context, bodyBytes []byte, device *
|
|||
if !u.IsSingleStageFlow(authType) {
|
||||
return nil, &util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.Unknown("missing or unknown auth.session"),
|
||||
JSON: jsonerror.Unknown("The auth.session is missing or unknown."),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,6 +125,18 @@ func GuestAccessForbidden(msg string) *MatrixError {
|
|||
return &MatrixError{"M_GUEST_ACCESS_FORBIDDEN", msg}
|
||||
}
|
||||
|
||||
// InvalidSignature is an error which is returned when the client tries
|
||||
// to upload invalid signatures.
|
||||
func InvalidSignature(msg string) *MatrixError {
|
||||
return &MatrixError{"M_INVALID_SIGNATURE", msg}
|
||||
}
|
||||
|
||||
// MissingParam is an error that is returned when a parameter was incorrect,
|
||||
// traditionally with cross-signing.
|
||||
func MissingParam(msg string) *MatrixError {
|
||||
return &MatrixError{"M_MISSING_PARAM", msg}
|
||||
}
|
||||
|
||||
type IncompatibleRoomVersionError struct {
|
||||
RoomVersion string `json:"room_version"`
|
||||
Error string `json:"error"`
|
||||
|
|
|
@ -0,0 +1,125 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package routing
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
func UploadCrossSigningDeviceKeys(
|
||||
req *http.Request, userInteractiveAuth *auth.UserInteractive,
|
||||
keyserverAPI api.KeyInternalAPI, device *userapi.Device,
|
||||
accountDB accounts.Database, cfg *config.ClientAPI,
|
||||
) util.JSONResponse {
|
||||
uploadReq := &api.PerformUploadDeviceKeysRequest{}
|
||||
uploadRes := &api.PerformUploadDeviceKeysResponse{}
|
||||
|
||||
ctx := req.Context()
|
||||
defer req.Body.Close() // nolint:errcheck
|
||||
bodyBytes, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.BadJSON("The request body could not be read: " + err.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := userInteractiveAuth.Verify(ctx, bodyBytes, device); err != nil {
|
||||
return *err
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(bodyBytes, &uploadReq); err != nil {
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.BadJSON("The request body could not be unmarshalled: " + err.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
uploadReq.UserID = device.UserID
|
||||
keyserverAPI.PerformUploadDeviceKeys(req.Context(), uploadReq, uploadRes)
|
||||
|
||||
if err := uploadRes.Error; err != nil {
|
||||
switch {
|
||||
case err.IsInvalidSignature:
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.InvalidSignature(err.Error()),
|
||||
}
|
||||
case err.IsMissingParam:
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.MissingParam(err.Error()),
|
||||
}
|
||||
default:
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.Unknown(err.Error()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
func UploadCrossSigningDeviceSignatures(req *http.Request, keyserverAPI api.KeyInternalAPI, device *userapi.Device) util.JSONResponse {
|
||||
uploadReq := &api.PerformUploadDeviceSignaturesRequest{}
|
||||
uploadRes := &api.PerformUploadDeviceSignaturesResponse{}
|
||||
|
||||
if err := httputil.UnmarshalJSONRequest(req, &uploadReq.Signatures); err != nil {
|
||||
return *err
|
||||
}
|
||||
|
||||
uploadReq.UserID = device.UserID
|
||||
keyserverAPI.PerformUploadDeviceSignatures(req.Context(), uploadReq, uploadRes)
|
||||
|
||||
if err := uploadRes.Error; err != nil {
|
||||
switch {
|
||||
case err.IsInvalidSignature:
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.InvalidSignature(err.Error()),
|
||||
}
|
||||
case err.IsMissingParam:
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.MissingParam(err.Error()),
|
||||
}
|
||||
default:
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.Unknown(err.Error()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
|
@ -100,7 +100,7 @@ func (r *queryKeysRequest) GetTimeout() time.Duration {
|
|||
return time.Duration(r.Timeout) * time.Millisecond
|
||||
}
|
||||
|
||||
func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI) util.JSONResponse {
|
||||
func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI, device *userapi.Device) util.JSONResponse {
|
||||
var r queryKeysRequest
|
||||
resErr := httputil.UnmarshalJSONRequest(req, &r)
|
||||
if resErr != nil {
|
||||
|
@ -108,6 +108,7 @@ func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI) util.JSONResponse {
|
|||
}
|
||||
queryRes := api.QueryKeysResponse{}
|
||||
keyAPI.QueryKeys(req.Context(), &api.QueryKeysRequest{
|
||||
UserID: device.UserID,
|
||||
UserToDevices: r.DeviceKeys,
|
||||
Timeout: r.GetTimeout(),
|
||||
// TODO: Token?
|
||||
|
@ -115,8 +116,11 @@ func QueryKeys(req *http.Request, keyAPI api.KeyInternalAPI) util.JSONResponse {
|
|||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: map[string]interface{}{
|
||||
"device_keys": queryRes.DeviceKeys,
|
||||
"failures": queryRes.Failures,
|
||||
"device_keys": queryRes.DeviceKeys,
|
||||
"master_keys": queryRes.MasterKeys,
|
||||
"self_signing_keys": queryRes.SelfSigningKeys,
|
||||
"user_signing_keys": queryRes.UserSigningKeys,
|
||||
"failures": queryRes.Failures,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,9 @@ func Setup(
|
|||
rateLimits := newRateLimits(&cfg.RateLimiting)
|
||||
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
|
||||
|
||||
unstableFeatures := make(map[string]bool)
|
||||
unstableFeatures := map[string]bool{
|
||||
//"org.matrix.e2e_cross_signing": true,
|
||||
}
|
||||
for _, msc := range cfg.MSCs.MSCs {
|
||||
unstableFeatures["org.matrix."+msc] = true
|
||||
}
|
||||
|
@ -1066,6 +1068,22 @@ func Setup(
|
|||
|
||||
// Deleting E2E Backup Keys
|
||||
|
||||
// Cross-signing device keys
|
||||
|
||||
postDeviceSigningKeys := httputil.MakeAuthAPI("post_device_signing_keys", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
return UploadCrossSigningDeviceKeys(req, userInteractiveAuth, keyAPI, device, accountDB, cfg)
|
||||
})
|
||||
|
||||
postDeviceSigningSignatures := httputil.MakeAuthAPI("post_device_signing_signatures", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
return UploadCrossSigningDeviceSignatures(req, keyAPI, device)
|
||||
})
|
||||
|
||||
r0mux.Handle("/keys/device_signing/upload", postDeviceSigningKeys).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/keys/signatures/upload", postDeviceSigningSignatures).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
unstableMux.Handle("/keys/device_signing/upload", postDeviceSigningKeys).Methods(http.MethodPost, http.MethodOptions)
|
||||
unstableMux.Handle("/keys/signatures/upload", postDeviceSigningSignatures).Methods(http.MethodPost, http.MethodOptions)
|
||||
|
||||
// Supplying a device ID is deprecated.
|
||||
r0mux.Handle("/keys/upload/{deviceID}",
|
||||
httputil.MakeAuthAPI("keys_upload", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
|
@ -1079,7 +1097,7 @@ func Setup(
|
|||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/keys/query",
|
||||
httputil.MakeAuthAPI("keys_query", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
return QueryKeys(req, keyAPI)
|
||||
return QueryKeys(req, keyAPI, device)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/keys/claim",
|
||||
|
|
|
@ -147,7 +147,7 @@ func main() {
|
|||
|
||||
accountDB := base.Base.CreateAccountsDB()
|
||||
federation := createFederationClient(base)
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Base.Cfg.KeyServer, federation)
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Base, &base.Base.Cfg.KeyServer, federation)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
|
|
|
@ -179,7 +179,7 @@ func main() {
|
|||
base, federation, rsAPI, keyRing, true,
|
||||
)
|
||||
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ func main() {
|
|||
serverKeyAPI := &signing.YggdrasilKeys{}
|
||||
keyRing := serverKeyAPI.KeyRing()
|
||||
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
|
|
|
@ -112,7 +112,7 @@ func main() {
|
|||
// This is different to rsAPI which can be the http client which doesn't need this dependency
|
||||
rsImpl.SetFederationSenderAPI(fsAPI)
|
||||
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
if traceInternal {
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
|
||||
func KeyServer(base *setup.BaseDendrite, cfg *config.Dendrite) {
|
||||
fsAPI := base.FederationSenderHTTPClient()
|
||||
intAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
|
||||
intAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
|
||||
intAPI.SetUserAPI(base.UserAPIClient())
|
||||
|
||||
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
|
||||
|
|
|
@ -184,7 +184,7 @@ func startup() {
|
|||
|
||||
accountDB := base.CreateAccountsDB()
|
||||
federation := conn.CreateFederationClient(base, pSessions)
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
|
|
|
@ -192,7 +192,7 @@ func main() {
|
|||
|
||||
accountDB := base.CreateAccountsDB()
|
||||
federation := createFederationClient(cfg, node)
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
|
|
|
@ -32,6 +32,8 @@ type KeyInternalAPI interface {
|
|||
PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse)
|
||||
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
||||
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
|
||||
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
|
||||
PerformUploadDeviceSignatures(ctx context.Context, req *PerformUploadDeviceSignaturesRequest, res *PerformUploadDeviceSignaturesResponse)
|
||||
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
|
||||
QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse)
|
||||
QueryOneTimeKeys(ctx context.Context, req *QueryOneTimeKeysRequest, res *QueryOneTimeKeysResponse)
|
||||
|
@ -40,7 +42,9 @@ type KeyInternalAPI interface {
|
|||
|
||||
// KeyError is returned if there was a problem performing/querying the server
|
||||
type KeyError struct {
|
||||
Err string
|
||||
Err string `json:"error"`
|
||||
IsInvalidSignature bool `json:"is_invalid_signature,omitempty"` // M_INVALID_SIGNATURE
|
||||
IsMissingParam bool `json:"is_missing_param,omitempty"` // M_MISSING_PARAM
|
||||
}
|
||||
|
||||
func (k *KeyError) Error() string {
|
||||
|
@ -151,7 +155,30 @@ type PerformClaimKeysResponse struct {
|
|||
Error *KeyError
|
||||
}
|
||||
|
||||
type PerformUploadDeviceKeysRequest struct {
|
||||
gomatrixserverlib.CrossSigningKeys
|
||||
// The user that uploaded the key, should be populated by the clientapi.
|
||||
UserID string `json:"user_id"`
|
||||
}
|
||||
|
||||
type PerformUploadDeviceKeysResponse struct {
|
||||
Error *KeyError
|
||||
}
|
||||
|
||||
type PerformUploadDeviceSignaturesRequest struct {
|
||||
Signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice
|
||||
// The user that uploaded the sig, should be populated by the clientapi.
|
||||
UserID string `json:"user_id"`
|
||||
}
|
||||
|
||||
type PerformUploadDeviceSignaturesResponse struct {
|
||||
Error *KeyError
|
||||
}
|
||||
|
||||
type QueryKeysRequest struct {
|
||||
// The user ID asking for the keys, e.g. if from a client API request.
|
||||
// Will not be populated if the key request came from federation.
|
||||
UserID string
|
||||
// Maps user IDs to a list of devices
|
||||
UserToDevices map[string][]string
|
||||
Timeout time.Duration
|
||||
|
@ -162,6 +189,10 @@ type QueryKeysResponse struct {
|
|||
Failures map[string]interface{}
|
||||
// Map of user_id to device_id to device_key
|
||||
DeviceKeys map[string]map[string]json.RawMessage
|
||||
// Maps of user_id to cross signing key
|
||||
MasterKeys map[string]gomatrixserverlib.CrossSigningKey
|
||||
SelfSigningKeys map[string]gomatrixserverlib.CrossSigningKey
|
||||
UserSigningKeys map[string]gomatrixserverlib.CrossSigningKey
|
||||
// Set if there was a fatal error processing this query
|
||||
Error *KeyError
|
||||
}
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
package consumers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
type OutputSigningKeyUpdateConsumer struct {
|
||||
eduServerConsumer *internal.ContinualConsumer
|
||||
keyDB storage.Database
|
||||
keyAPI api.KeyInternalAPI
|
||||
serverName string
|
||||
}
|
||||
|
||||
func NewOutputSigningKeyUpdateConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.Dendrite,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
keyDB storage.Database,
|
||||
keyAPI api.KeyInternalAPI,
|
||||
) *OutputSigningKeyUpdateConsumer {
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "keyserver/eduserver",
|
||||
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: keyDB,
|
||||
}
|
||||
s := &OutputSigningKeyUpdateConsumer{
|
||||
eduServerConsumer: &consumer,
|
||||
keyDB: keyDB,
|
||||
keyAPI: keyAPI,
|
||||
serverName: string(cfg.Global.ServerName),
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *OutputSigningKeyUpdateConsumer) Start() error {
|
||||
return s.eduServerConsumer.Start()
|
||||
}
|
||||
|
||||
func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
/*
|
||||
var output eduapi.OutputSigningKeyUpdate
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
log.WithError(err).Errorf("eduserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
*/
|
||||
return fmt.Errorf("TODO")
|
||||
}
|
|
@ -0,0 +1,389 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func sanityCheckKey(key gomatrixserverlib.CrossSigningKey, userID string, purpose gomatrixserverlib.CrossSigningKeyPurpose) error {
|
||||
// Is there exactly one key?
|
||||
if len(key.Keys) != 1 {
|
||||
return fmt.Errorf("should contain exactly one key")
|
||||
}
|
||||
|
||||
// Does the key ID match the key value? Iterates exactly once
|
||||
for keyID, keyData := range key.Keys {
|
||||
b64 := keyData.Encode()
|
||||
tokens := strings.Split(string(keyID), ":")
|
||||
if len(tokens) != 2 {
|
||||
return fmt.Errorf("key ID is incorrectly formatted")
|
||||
}
|
||||
if tokens[1] != b64 {
|
||||
return fmt.Errorf("key ID isn't correct")
|
||||
}
|
||||
}
|
||||
|
||||
// Does the key claim to be from the right user?
|
||||
if userID != key.UserID {
|
||||
return fmt.Errorf("key has a user ID mismatch")
|
||||
}
|
||||
|
||||
// Does the key contain the correct purpose?
|
||||
useful := false
|
||||
for _, usage := range key.Usage {
|
||||
if usage == purpose {
|
||||
useful = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !useful {
|
||||
return fmt.Errorf("key does not contain correct usage purpose")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
|
||||
var masterKey gomatrixserverlib.Base64Bytes
|
||||
hasMasterKey := false
|
||||
|
||||
if len(req.MasterKey.Keys) > 0 {
|
||||
if err := sanityCheckKey(req.MasterKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "Master key sanity check failed: " + err.Error(),
|
||||
}
|
||||
return
|
||||
}
|
||||
hasMasterKey = true
|
||||
for _, keyData := range req.MasterKey.Keys { // iterates once, because sanityCheckKey requires one key
|
||||
masterKey = keyData
|
||||
}
|
||||
}
|
||||
|
||||
if len(req.SelfSigningKey.Keys) > 0 {
|
||||
if err := sanityCheckKey(req.SelfSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "Self-signing key sanity check failed: " + err.Error(),
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(req.UserSigningKey.Keys) > 0 {
|
||||
if err := sanityCheckKey(req.UserSigningKey, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeUserSigning); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "User-signing key sanity check failed: " + err.Error(),
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If the user hasn't given a new master key, then let's go and get their
|
||||
// existing keys from the database.
|
||||
if !hasMasterKey {
|
||||
existingKeys, err := a.DB.CrossSigningKeysForUser(ctx, req.UserID)
|
||||
if err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "Retrieving cross-signing keys from database failed: " + err.Error(),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
masterKey, hasMasterKey = existingKeys[gomatrixserverlib.CrossSigningKeyPurposeMaster]
|
||||
}
|
||||
|
||||
// If the user isn't a local user and we haven't successfully found a key
|
||||
// through any local means then ask over federation.
|
||||
if !hasMasterKey {
|
||||
_, host, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "Retrieving cross-signing keys from federation failed: " + err.Error(),
|
||||
}
|
||||
return
|
||||
}
|
||||
keys, err := a.FedClient.QueryKeys(ctx, host, map[string][]string{
|
||||
req.UserID: {},
|
||||
})
|
||||
if err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "Retrieving cross-signing keys from federation failed: " + err.Error(),
|
||||
}
|
||||
return
|
||||
}
|
||||
switch k := keys.MasterKeys[req.UserID].CrossSigningBody.(type) {
|
||||
case *gomatrixserverlib.CrossSigningKey:
|
||||
if err := sanityCheckKey(*k, req.UserID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "Master key sanity check failed: " + err.Error(),
|
||||
}
|
||||
return
|
||||
}
|
||||
default:
|
||||
res.Error = &api.KeyError{
|
||||
Err: "Unexpected type for master key retrieved from federation",
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If we still don't have a master key at this point then there's nothing else
|
||||
// we can do - we've checked both the request and the database.
|
||||
if !hasMasterKey {
|
||||
res.Error = &api.KeyError{
|
||||
Err: "No master key was found, either in the database or in the request!",
|
||||
IsMissingParam: true,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// The key ID is basically the key itself.
|
||||
masterKeyID := gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", masterKey.Encode()))
|
||||
|
||||
// Work out which things we need to verify the signatures for.
|
||||
toVerify := make(map[gomatrixserverlib.CrossSigningKeyPurpose]gomatrixserverlib.CrossSigningKey, 3)
|
||||
toStore := types.CrossSigningKeyMap{}
|
||||
if len(req.MasterKey.Keys) > 0 {
|
||||
toVerify[gomatrixserverlib.CrossSigningKeyPurposeMaster] = req.MasterKey
|
||||
}
|
||||
if len(req.SelfSigningKey.Keys) > 0 {
|
||||
toVerify[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning] = req.SelfSigningKey
|
||||
}
|
||||
if len(req.UserSigningKey.Keys) > 0 {
|
||||
toVerify[gomatrixserverlib.CrossSigningKeyPurposeUserSigning] = req.UserSigningKey
|
||||
}
|
||||
for purpose, key := range toVerify {
|
||||
// Collect together the key IDs we need to verify with. This will include
|
||||
// all of the key IDs specified in the signatures. We don't do this for
|
||||
// the master key because we have no means to verify the signatures - we
|
||||
// instead just need to store them.
|
||||
if purpose != gomatrixserverlib.CrossSigningKeyPurposeMaster {
|
||||
// Marshal the specific key back into JSON so that we can verify the
|
||||
// signature of it.
|
||||
keyJSON, err := json.Marshal(key)
|
||||
if err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: fmt.Sprintf("The JSON of the key section is invalid: %s", err.Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Now check if the subkey is signed by the master key.
|
||||
if err := gomatrixserverlib.VerifyJSON(req.UserID, masterKeyID, ed25519.PublicKey(masterKey), keyJSON); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: fmt.Sprintf("The %q sub-key failed master key signature verification: %s", purpose, err.Error()),
|
||||
IsInvalidSignature: true,
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// If we've reached this point then all the signatures are valid so
|
||||
// add the key to the list of keys to store.
|
||||
for _, keyData := range key.Keys { // iterates once, see sanityCheckKey
|
||||
toStore[purpose] = keyData
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.DB.StoreCrossSigningKeysForUser(ctx, req.UserID, toStore); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: fmt.Sprintf("a.DB.StoreCrossSigningKeysForUser: %s", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req *api.PerformUploadDeviceSignaturesRequest, res *api.PerformUploadDeviceSignaturesResponse) {
|
||||
selfSignatures := map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{}
|
||||
otherSignatures := map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{}
|
||||
|
||||
for userID, forUserID := range req.Signatures {
|
||||
for keyID, keyOrDevice := range forUserID {
|
||||
switch key := keyOrDevice.CrossSigningBody.(type) {
|
||||
case *gomatrixserverlib.CrossSigningKey:
|
||||
if key.UserID == req.UserID {
|
||||
if _, ok := selfSignatures[userID]; !ok {
|
||||
selfSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{}
|
||||
}
|
||||
selfSignatures[userID][keyID] = keyOrDevice
|
||||
} else {
|
||||
if _, ok := otherSignatures[userID]; !ok {
|
||||
otherSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{}
|
||||
}
|
||||
otherSignatures[userID][keyID] = keyOrDevice
|
||||
}
|
||||
|
||||
case *gomatrixserverlib.DeviceKeys:
|
||||
if key.UserID == req.UserID {
|
||||
if _, ok := selfSignatures[userID]; !ok {
|
||||
selfSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{}
|
||||
}
|
||||
selfSignatures[userID][keyID] = keyOrDevice
|
||||
} else {
|
||||
if _, ok := otherSignatures[userID]; !ok {
|
||||
otherSignatures[userID] = map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice{}
|
||||
}
|
||||
otherSignatures[userID][keyID] = keyOrDevice
|
||||
}
|
||||
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.processSelfSignatures(ctx, req.UserID, selfSignatures); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: fmt.Sprintf("a.processSelfSignatures: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err := a.processOtherSignatures(ctx, req.UserID, otherSignatures); err != nil {
|
||||
res.Error = &api.KeyError{
|
||||
Err: fmt.Sprintf("a.processOtherSignatures: %s", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) processSelfSignatures(
|
||||
ctx context.Context, _ string,
|
||||
signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice,
|
||||
) error {
|
||||
// Here we will process:
|
||||
// * The user signing their own devices using their self-signing key
|
||||
// * The user signing their master key using one of their devices
|
||||
|
||||
for targetUserID, forTargetUserID := range signatures {
|
||||
for targetKeyID, signature := range forTargetUserID {
|
||||
switch sig := signature.CrossSigningBody.(type) {
|
||||
case *gomatrixserverlib.CrossSigningKey:
|
||||
for originUserID, forOriginUserID := range sig.Signatures {
|
||||
for originKeyID, originSig := range forOriginUserID {
|
||||
if err := a.DB.StoreCrossSigningSigsForTarget(
|
||||
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
|
||||
); err != nil {
|
||||
return fmt.Errorf("a.DB.StoreCrossSigningKeysForTarget: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case *gomatrixserverlib.DeviceKeys:
|
||||
for originUserID, forOriginUserID := range sig.Signatures {
|
||||
for originKeyID, originSig := range forOriginUserID {
|
||||
if err := a.DB.StoreCrossSigningSigsForTarget(
|
||||
ctx, originUserID, originKeyID, targetUserID, targetKeyID, originSig,
|
||||
); err != nil {
|
||||
return fmt.Errorf("a.DB.StoreCrossSigningKeysForTarget: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unexpected type assertion")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) processOtherSignatures(
|
||||
ctx context.Context, userID string,
|
||||
signatures map[string]map[gomatrixserverlib.KeyID]gomatrixserverlib.CrossSigningForKeyOrDevice,
|
||||
) error {
|
||||
// Here we will process:
|
||||
// * A user signing someone else's master keys using their user-signing keys
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) crossSigningKeysFromDatabase(
|
||||
ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse,
|
||||
) {
|
||||
for userID := range req.UserToDevices {
|
||||
keys, err := a.DB.CrossSigningKeysForUser(ctx, userID)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to get cross-signing keys for user %q", userID)
|
||||
continue
|
||||
}
|
||||
|
||||
for keyType, keyData := range keys {
|
||||
b64 := keyData.Encode()
|
||||
keyID := gomatrixserverlib.KeyID("ed25519:" + b64)
|
||||
key := gomatrixserverlib.CrossSigningKey{
|
||||
UserID: userID,
|
||||
Usage: []gomatrixserverlib.CrossSigningKeyPurpose{
|
||||
keyType,
|
||||
},
|
||||
Keys: map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes{
|
||||
keyID: keyData,
|
||||
},
|
||||
}
|
||||
|
||||
sigs, err := a.DB.CrossSigningSigsForTarget(ctx, userID, keyID)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to get cross-signing signatures for user %q key %q", userID, keyID)
|
||||
continue
|
||||
}
|
||||
|
||||
appendSignature := func(originUserID string, originKeyID gomatrixserverlib.KeyID, signature gomatrixserverlib.Base64Bytes) {
|
||||
if key.Signatures == nil {
|
||||
key.Signatures = types.CrossSigningSigMap{}
|
||||
}
|
||||
if _, ok := key.Signatures[originUserID]; !ok {
|
||||
key.Signatures[originUserID] = make(map[gomatrixserverlib.KeyID]gomatrixserverlib.Base64Bytes)
|
||||
}
|
||||
key.Signatures[originUserID][originKeyID] = signature
|
||||
}
|
||||
|
||||
for originUserID, forOrigin := range sigs {
|
||||
for originKeyID, signature := range forOrigin {
|
||||
switch {
|
||||
case req.UserID != "" && originUserID == req.UserID:
|
||||
// Include signatures that we created
|
||||
appendSignature(originUserID, originKeyID, signature)
|
||||
case originUserID == userID:
|
||||
// Include signatures that were created by the person whose key
|
||||
// we are processing
|
||||
appendSignature(originUserID, originKeyID, signature)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch keyType {
|
||||
case gomatrixserverlib.CrossSigningKeyPurposeMaster:
|
||||
res.MasterKeys[userID] = key
|
||||
|
||||
case gomatrixserverlib.CrossSigningKeyPurposeSelfSigning:
|
||||
res.SelfSigningKeys[userID] = key
|
||||
|
||||
case gomatrixserverlib.CrossSigningKeyPurposeUserSigning:
|
||||
res.UserSigningKeys[userID] = key
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -82,6 +82,7 @@ type DeviceListUpdater struct {
|
|||
mu *sync.Mutex // protects UserIDToMutex
|
||||
|
||||
db DeviceListUpdaterDatabase
|
||||
api DeviceListUpdaterAPI
|
||||
producer KeyChangeProducer
|
||||
fedClient fedsenderapi.FederationClient
|
||||
workerChans []chan gomatrixserverlib.ServerName
|
||||
|
@ -114,6 +115,10 @@ type DeviceListUpdaterDatabase interface {
|
|||
DeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) error
|
||||
}
|
||||
|
||||
type DeviceListUpdaterAPI interface {
|
||||
PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse)
|
||||
}
|
||||
|
||||
// KeyChangeProducer is the interface for producers.KeyChange useful for testing.
|
||||
type KeyChangeProducer interface {
|
||||
ProduceKeyChanges(keys []api.DeviceMessage) error
|
||||
|
@ -121,13 +126,14 @@ type KeyChangeProducer interface {
|
|||
|
||||
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
|
||||
func NewDeviceListUpdater(
|
||||
db DeviceListUpdaterDatabase, producer KeyChangeProducer, fedClient fedsenderapi.FederationClient,
|
||||
numWorkers int,
|
||||
db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||
fedClient fedsenderapi.FederationClient, numWorkers int,
|
||||
) *DeviceListUpdater {
|
||||
return &DeviceListUpdater{
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
mu: &sync.Mutex{},
|
||||
db: db,
|
||||
api: api,
|
||||
producer: producer,
|
||||
fedClient: fedClient,
|
||||
workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
|
||||
|
@ -367,6 +373,23 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
|||
}
|
||||
continue
|
||||
}
|
||||
if res.MasterKey != nil || res.SelfSigningKey != nil {
|
||||
uploadReq := &api.PerformUploadDeviceKeysRequest{
|
||||
UserID: userID,
|
||||
}
|
||||
uploadRes := &api.PerformUploadDeviceKeysResponse{}
|
||||
if res.MasterKey != nil {
|
||||
if err = sanityCheckKey(*res.MasterKey, userID, gomatrixserverlib.CrossSigningKeyPurposeMaster); err == nil {
|
||||
uploadReq.MasterKey = *res.MasterKey
|
||||
}
|
||||
}
|
||||
if res.SelfSigningKey != nil {
|
||||
if err = sanityCheckKey(*res.SelfSigningKey, userID, gomatrixserverlib.CrossSigningKeyPurposeSelfSigning); err == nil {
|
||||
uploadReq.SelfSigningKey = *res.SelfSigningKey
|
||||
}
|
||||
}
|
||||
u.api.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes)
|
||||
}
|
||||
err = u.updateDeviceList(&res)
|
||||
if err != nil {
|
||||
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
|
||||
|
|
|
@ -95,6 +95,13 @@ func (d *mockDeviceListUpdaterDatabase) DeviceKeysJSON(ctx context.Context, keys
|
|||
return nil
|
||||
}
|
||||
|
||||
type mockDeviceListUpdaterAPI struct {
|
||||
}
|
||||
|
||||
func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
|
||||
|
||||
}
|
||||
|
||||
type roundTripper struct {
|
||||
fn func(*http.Request) (*http.Response, error)
|
||||
}
|
||||
|
@ -122,8 +129,9 @@ func TestUpdateHavePrevID(t *testing.T) {
|
|||
return true
|
||||
},
|
||||
}
|
||||
ap := &mockDeviceListUpdaterAPI{}
|
||||
producer := &mockKeyChangeProducer{}
|
||||
updater := NewDeviceListUpdater(db, producer, nil, 1)
|
||||
updater := NewDeviceListUpdater(db, ap, producer, nil, 1)
|
||||
event := gomatrixserverlib.DeviceListUpdateEvent{
|
||||
DeviceDisplayName: "Foo Bar",
|
||||
Deleted: false,
|
||||
|
@ -166,6 +174,7 @@ func TestUpdateNoPrevID(t *testing.T) {
|
|||
return false
|
||||
},
|
||||
}
|
||||
ap := &mockDeviceListUpdaterAPI{}
|
||||
producer := &mockKeyChangeProducer{}
|
||||
remoteUserID := "@alice:example.somewhere"
|
||||
var wg sync.WaitGroup
|
||||
|
@ -193,7 +202,7 @@ func TestUpdateNoPrevID(t *testing.T) {
|
|||
`)),
|
||||
}, nil
|
||||
})
|
||||
updater := NewDeviceListUpdater(db, producer, fedClient, 2)
|
||||
updater := NewDeviceListUpdater(db, ap, producer, fedClient, 2)
|
||||
if err := updater.Start(); err != nil {
|
||||
t.Fatalf("failed to start updater: %s", err)
|
||||
}
|
||||
|
|
|
@ -221,9 +221,17 @@ func (a *KeyInternalAPI) QueryDeviceMessages(ctx context.Context, req *api.Query
|
|||
|
||||
func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) {
|
||||
res.DeviceKeys = make(map[string]map[string]json.RawMessage)
|
||||
res.MasterKeys = make(map[string]gomatrixserverlib.CrossSigningKey)
|
||||
res.SelfSigningKeys = make(map[string]gomatrixserverlib.CrossSigningKey)
|
||||
res.UserSigningKeys = make(map[string]gomatrixserverlib.CrossSigningKey)
|
||||
res.Failures = make(map[string]interface{})
|
||||
|
||||
// get cross-signing keys from the database
|
||||
a.crossSigningKeysFromDatabase(ctx, req, res)
|
||||
|
||||
// make a map from domain to device keys
|
||||
domainToDeviceKeys := make(map[string]map[string][]string)
|
||||
domainToCrossSigningKeys := make(map[string]map[string]struct{})
|
||||
for userID, deviceIDs := range req.UserToDevices {
|
||||
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
|
@ -274,16 +282,30 @@ func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysReques
|
|||
domainToDeviceKeys[domain] = make(map[string][]string)
|
||||
domainToDeviceKeys[domain][userID] = append(domainToDeviceKeys[domain][userID], deviceIDs...)
|
||||
}
|
||||
// work out if our cross-signing request for this user was
|
||||
// satisfied, if not add them to the list of things to fetch
|
||||
if _, ok := res.MasterKeys[userID]; !ok {
|
||||
if _, ok := domainToCrossSigningKeys[domain]; !ok {
|
||||
domainToCrossSigningKeys[domain] = make(map[string]struct{})
|
||||
}
|
||||
domainToCrossSigningKeys[domain][userID] = struct{}{}
|
||||
}
|
||||
if _, ok := res.SelfSigningKeys[userID]; !ok {
|
||||
if _, ok := domainToCrossSigningKeys[domain]; !ok {
|
||||
domainToCrossSigningKeys[domain] = make(map[string]struct{})
|
||||
}
|
||||
domainToCrossSigningKeys[domain][userID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// attempt to satisfy key queries from the local database first as we should get device updates pushed to us
|
||||
domainToDeviceKeys = a.remoteKeysFromDatabase(ctx, res, domainToDeviceKeys)
|
||||
if len(domainToDeviceKeys) == 0 {
|
||||
if len(domainToDeviceKeys) == 0 && len(domainToCrossSigningKeys) == 0 {
|
||||
return // nothing to query
|
||||
}
|
||||
|
||||
// perform key queries for remote devices
|
||||
a.queryRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys)
|
||||
a.queryRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys, domainToCrossSigningKeys)
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) remoteKeysFromDatabase(
|
||||
|
@ -313,18 +335,30 @@ func (a *KeyInternalAPI) remoteKeysFromDatabase(
|
|||
}
|
||||
|
||||
func (a *KeyInternalAPI) queryRemoteKeys(
|
||||
ctx context.Context, timeout time.Duration, res *api.QueryKeysResponse, domainToDeviceKeys map[string]map[string][]string,
|
||||
ctx context.Context, timeout time.Duration, res *api.QueryKeysResponse,
|
||||
domainToDeviceKeys map[string]map[string][]string, domainToCrossSigningKeys map[string]map[string]struct{},
|
||||
) {
|
||||
resultCh := make(chan *gomatrixserverlib.RespQueryKeys, len(domainToDeviceKeys))
|
||||
// allows us to wait until all federation servers have been poked
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(domainToDeviceKeys))
|
||||
// mutex for writing directly to res (e.g failures)
|
||||
var respMu sync.Mutex
|
||||
|
||||
domains := map[string]struct{}{}
|
||||
for domain := range domainToDeviceKeys {
|
||||
domains[domain] = struct{}{}
|
||||
}
|
||||
for domain := range domainToCrossSigningKeys {
|
||||
domains[domain] = struct{}{}
|
||||
}
|
||||
wg.Add(len(domains))
|
||||
|
||||
// fan out
|
||||
for domain, deviceKeys := range domainToDeviceKeys {
|
||||
go a.queryRemoteKeysOnServer(ctx, domain, deviceKeys, &wg, &respMu, timeout, resultCh, res)
|
||||
for domain := range domains {
|
||||
go a.queryRemoteKeysOnServer(
|
||||
ctx, domain, domainToDeviceKeys[domain], domainToCrossSigningKeys[domain],
|
||||
&wg, &respMu, timeout, resultCh, res,
|
||||
)
|
||||
}
|
||||
|
||||
// Close the result channel when the goroutines have quit so the for .. range exits
|
||||
|
@ -344,12 +378,29 @@ func (a *KeyInternalAPI) queryRemoteKeys(
|
|||
res.DeviceKeys[userID][deviceID] = keyJSON
|
||||
}
|
||||
}
|
||||
|
||||
for userID, body := range result.MasterKeys {
|
||||
switch b := body.CrossSigningBody.(type) {
|
||||
case *gomatrixserverlib.CrossSigningKey:
|
||||
res.MasterKeys[userID] = *b
|
||||
}
|
||||
}
|
||||
|
||||
for userID, body := range result.SelfSigningKeys {
|
||||
switch b := body.CrossSigningBody.(type) {
|
||||
case *gomatrixserverlib.CrossSigningKey:
|
||||
res.SelfSigningKeys[userID] = *b
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: do we want to persist these somewhere now
|
||||
// that we have fetched them?
|
||||
}
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) queryRemoteKeysOnServer(
|
||||
ctx context.Context, serverName string, devKeys map[string][]string, wg *sync.WaitGroup,
|
||||
respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys,
|
||||
ctx context.Context, serverName string, devKeys map[string][]string, crossSigningKeys map[string]struct{},
|
||||
wg *sync.WaitGroup, respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys,
|
||||
res *api.QueryKeysResponse,
|
||||
) {
|
||||
defer wg.Done()
|
||||
|
@ -358,14 +409,24 @@ func (a *KeyInternalAPI) queryRemoteKeysOnServer(
|
|||
// for users who we do not have any knowledge about, try to start doing device list updates for them
|
||||
// by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but
|
||||
// lack a stream ID.
|
||||
var userIDsForAllDevices []string
|
||||
userIDsForAllDevices := map[string]struct{}{}
|
||||
for userID, deviceIDs := range devKeys {
|
||||
if len(deviceIDs) == 0 {
|
||||
userIDsForAllDevices = append(userIDsForAllDevices, userID)
|
||||
userIDsForAllDevices[userID] = struct{}{}
|
||||
delete(devKeys, userID)
|
||||
}
|
||||
}
|
||||
for _, userID := range userIDsForAllDevices {
|
||||
// for cross-signing keys, it's probably easier just to hit /keys/query if we aren't already doing
|
||||
// a device list update, so we'll populate those back into the /keys/query list if not
|
||||
for userID := range crossSigningKeys {
|
||||
if devKeys == nil {
|
||||
devKeys = map[string][]string{}
|
||||
}
|
||||
if _, ok := userIDsForAllDevices[userID]; !ok {
|
||||
devKeys[userID] = []string{}
|
||||
}
|
||||
}
|
||||
for userID := range userIDsForAllDevices {
|
||||
err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
|
|
|
@ -27,13 +27,15 @@ import (
|
|||
|
||||
// HTTP paths for the internal HTTP APIs
|
||||
const (
|
||||
InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate"
|
||||
PerformUploadKeysPath = "/keyserver/performUploadKeys"
|
||||
PerformClaimKeysPath = "/keyserver/performClaimKeys"
|
||||
QueryKeysPath = "/keyserver/queryKeys"
|
||||
QueryKeyChangesPath = "/keyserver/queryKeyChanges"
|
||||
QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys"
|
||||
QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages"
|
||||
InputDeviceListUpdatePath = "/keyserver/inputDeviceListUpdate"
|
||||
PerformUploadKeysPath = "/keyserver/performUploadKeys"
|
||||
PerformClaimKeysPath = "/keyserver/performClaimKeys"
|
||||
PerformUploadDeviceKeysPath = "/keyserver/performUploadDeviceKeys"
|
||||
PerformUploadDeviceSignaturesPath = "/keyserver/performUploadDeviceSignatures"
|
||||
QueryKeysPath = "/keyserver/queryKeys"
|
||||
QueryKeyChangesPath = "/keyserver/queryKeyChanges"
|
||||
QueryOneTimeKeysPath = "/keyserver/queryOneTimeKeys"
|
||||
QueryDeviceMessagesPath = "/keyserver/queryDeviceMessages"
|
||||
)
|
||||
|
||||
// NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API.
|
||||
|
@ -175,3 +177,37 @@ func (h *httpKeyInternalAPI) QueryKeyChanges(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpKeyInternalAPI) PerformUploadDeviceKeys(
|
||||
ctx context.Context,
|
||||
request *api.PerformUploadDeviceKeysRequest,
|
||||
response *api.PerformUploadDeviceKeysResponse,
|
||||
) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUploadDeviceKeys")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + PerformUploadDeviceKeysPath
|
||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
if err != nil {
|
||||
response.Error = &api.KeyError{
|
||||
Err: err.Error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpKeyInternalAPI) PerformUploadDeviceSignatures(
|
||||
ctx context.Context,
|
||||
request *api.PerformUploadDeviceSignaturesRequest,
|
||||
response *api.PerformUploadDeviceSignaturesResponse,
|
||||
) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUploadDeviceSignatures")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.apiURL + PerformUploadDeviceSignaturesPath
|
||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
if err != nil {
|
||||
response.Error = &api.KeyError{
|
||||
Err: err.Error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,28 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(PerformUploadDeviceKeysPath,
|
||||
httputil.MakeInternalAPI("performUploadDeviceKeys", func(req *http.Request) util.JSONResponse {
|
||||
request := api.PerformUploadDeviceKeysRequest{}
|
||||
response := api.PerformUploadDeviceKeysResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request.CrossSigningKeys); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
s.PerformUploadDeviceKeys(req.Context(), &request, &response)
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(PerformUploadDeviceSignaturesPath,
|
||||
httputil.MakeInternalAPI("performUploadDeviceSignatures", func(req *http.Request) util.JSONResponse {
|
||||
request := api.PerformUploadDeviceSignaturesRequest{}
|
||||
response := api.PerformUploadDeviceSignaturesResponse{}
|
||||
if err := json.NewDecoder(req.Body).Decode(&request.Signatures); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
s.PerformUploadDeviceSignatures(req.Context(), &request, &response)
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(QueryKeysPath,
|
||||
httputil.MakeInternalAPI("queryKeys", func(req *http.Request) util.JSONResponse {
|
||||
request := api.QueryKeysRequest{}
|
||||
|
|
|
@ -18,10 +18,12 @@ import (
|
|||
"github.com/gorilla/mux"
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/consumers"
|
||||
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/keyserver/producers"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/kafka"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -36,9 +38,9 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
|
|||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(
|
||||
cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
|
||||
base *setup.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
|
||||
) api.KeyInternalAPI {
|
||||
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||
consumer, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||
|
||||
db, err := storage.NewDatabase(&cfg.Database)
|
||||
if err != nil {
|
||||
|
@ -49,17 +51,26 @@ func NewInternalAPI(
|
|||
Producer: producer,
|
||||
DB: db,
|
||||
}
|
||||
updater := internal.NewDeviceListUpdater(db, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||
ap := &internal.KeyInternalAPI{
|
||||
DB: db,
|
||||
ThisServer: cfg.Matrix.ServerName,
|
||||
FedClient: fedClient,
|
||||
Producer: keyChangeProducer,
|
||||
}
|
||||
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||
ap.Updater = updater
|
||||
go func() {
|
||||
if err := updater.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start device list updater")
|
||||
}
|
||||
}()
|
||||
return &internal.KeyInternalAPI{
|
||||
DB: db,
|
||||
ThisServer: cfg.Matrix.ServerName,
|
||||
FedClient: fedClient,
|
||||
Producer: keyChangeProducer,
|
||||
Updater: updater,
|
||||
|
||||
keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer(
|
||||
base.ProcessContext, base.Cfg, consumer, db, ap,
|
||||
)
|
||||
if err := keyconsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
|
||||
}
|
||||
|
||||
return ap
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ const (
|
|||
TopicOutputRoomEvent = "OutputRoomEvent"
|
||||
TopicOutputClientData = "OutputClientData"
|
||||
TopicOutputReceiptEvent = "OutputReceiptEvent"
|
||||
TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate"
|
||||
)
|
||||
|
||||
type Kafka struct {
|
||||
|
|
|
@ -33,6 +33,10 @@ func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {}
|
|||
// PerformClaimKeys claims one-time keys for use in pre-key messages
|
||||
func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) {
|
||||
}
|
||||
func (k *mockKeyAPI) PerformUploadDeviceKeys(ctx context.Context, req *keyapi.PerformUploadDeviceKeysRequest, res *keyapi.PerformUploadDeviceKeysResponse) {
|
||||
}
|
||||
func (k *mockKeyAPI) PerformUploadDeviceSignatures(ctx context.Context, req *keyapi.PerformUploadDeviceSignaturesRequest, res *keyapi.PerformUploadDeviceSignaturesResponse) {
|
||||
}
|
||||
func (k *mockKeyAPI) QueryKeys(ctx context.Context, req *keyapi.QueryKeysRequest, res *keyapi.QueryKeysResponse) {
|
||||
}
|
||||
func (k *mockKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyChangesRequest, res *keyapi.QueryKeyChangesResponse) {
|
||||
|
|
|
@ -550,3 +550,6 @@ Will not back up to an old backup version
|
|||
Can create more than 10 backup versions
|
||||
Can delete backup
|
||||
Deleted & recreated backups are empty
|
||||
Can upload self-signing keys
|
||||
Fails to upload self-signing keys with no auth
|
||||
Fails to upload self-signing key without master key
|
||||
|
|
Loading…
Reference in New Issue