From 83391da0e04dda7a52589ee7ec6df2b615571894 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 16 Jun 2020 17:05:38 +0100 Subject: [PATCH] Make syncapi use userapi (#1136) * Make syncapi use userapi * Unbreak things * Fix tests * Lint --- cmd/dendrite-sync-api-server/main.go | 3 +- docs/WIRING-Current.md | 2 ++ internal/setup/monolith.go | 2 +- syncapi/sync/requestpool.go | 45 +++++++++++++++------------- syncapi/syncapi.go | 4 +-- userapi/api/api.go | 23 +++++++++++++- userapi/internal/api.go | 33 ++++++++++++++++++++ userapi/inthttp/client.go | 9 ++++++ userapi/inthttp/server.go | 13 ++++++++ 9 files changed, 106 insertions(+), 28 deletions(-) diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index a17b648d..d67395fb 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -25,12 +25,11 @@ func main() { defer base.Close() // nolint: errcheck userAPI := base.UserAPIClient() - accountDB := base.CreateAccountsDB() federation := base.CreateFederationClient() rsAPI := base.RoomserverHTTPClient() - syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, accountDB, rsAPI, federation, cfg) + syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) diff --git a/docs/WIRING-Current.md b/docs/WIRING-Current.md index 62450f2f..ec539d4e 100644 --- a/docs/WIRING-Current.md +++ b/docs/WIRING-Current.md @@ -39,6 +39,8 @@ Internal only | `------------------- - 12 (FedSender -> ServerKeyAPI): Verifying event signatures of responses (e.g from send_join) - 13 (Roomserver -> ServerKeyAPI): Verifying event signatures of backfilled events +In addition to this, all public facing components (Tier 1) talk to the `UserAPI` to verify access tokens and extract profile information where needed. + ## Kafka logs ``` diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index aec14aa7..bb81f740 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -87,6 +87,6 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { m.ExtPublicRoomsProvider, ) syncapi.AddPublicRoutes( - publicMux, m.KafkaConsumer, m.UserAPI, m.AccountDB, m.RoomserverAPI, m.FedClient, m.Config, + publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config, ) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index ec22a05f..26b925ea 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -21,7 +21,6 @@ import ( "net/http" "time" - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" @@ -33,14 +32,14 @@ import ( // RequestPool manages HTTP long-poll connections for /sync type RequestPool struct { - db storage.Database - accountDB accounts.Database - notifier *Notifier + db storage.Database + userAPI userapi.UserInternalAPI + notifier *Notifier } // NewRequestPool makes a new RequestPool -func NewRequestPool(db storage.Database, n *Notifier, adb accounts.Database) *RequestPool { - return &RequestPool{db, adb, n} +func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool { + return &RequestPool{db, userAPI, n} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -193,6 +192,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea return } +// nolint:gocyclo func (rp *RequestPool) appendAccountData( data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, accountDataFilter *gomatrixserverlib.EventFilter, @@ -202,25 +202,21 @@ func (rp *RequestPool) appendAccountData( // data keys were set between two message. This isn't a huge issue since the // duplicate data doesn't represent a huge quantity of data, but an optimisation // here would be making sure each data is sent only once to the client. - localpart, _, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - return nil, err - } - if req.since == nil { // If this is the initial sync, we don't need to check if a data has // already been sent. Instead, we send the whole batch. - var global []gomatrixserverlib.ClientEvent - var rooms map[string][]gomatrixserverlib.ClientEvent - global, rooms, err = rp.accountDB.GetAccountData(req.ctx, localpart) + var res userapi.QueryAccountDataResponse + err := rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{ + UserID: userID, + }, &res) if err != nil { return nil, err } - data.AccountData.Events = global + data.AccountData.Events = res.GlobalAccountData for r, j := range data.Rooms.Join { - if len(rooms[r]) > 0 { - j.AccountData.Events = rooms[r] + if len(res.RoomAccountData[r]) > 0 { + j.AccountData.Events = res.RoomAccountData[r] data.Rooms.Join[r] = j } } @@ -256,13 +252,20 @@ func (rp *RequestPool) appendAccountData( events := []gomatrixserverlib.ClientEvent{} // Request the missing data from the database for _, dataType := range dataTypes { - event, err := rp.accountDB.GetAccountDataByType( - req.ctx, localpart, roomID, dataType, - ) + var res userapi.QueryAccountDataResponse + err = rp.userAPI.QueryAccountData(req.ctx, &userapi.QueryAccountDataRequest{ + UserID: userID, + RoomID: roomID, + DataType: dataType, + }, &res) if err != nil { return nil, err } - events = append(events, *event) + if len(res.RoomAccountData[roomID]) > 0 { + events = append(events, res.RoomAccountData[roomID]...) + } else if len(res.GlobalAccountData) > 0 { + events = append(events, res.GlobalAccountData...) + } } // Append the data to the response diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 2351ee4d..caf91e27 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -21,7 +21,6 @@ import ( "github.com/gorilla/mux" "github.com/sirupsen/logrus" - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -39,7 +38,6 @@ func AddPublicRoutes( router *mux.Router, consumer sarama.Consumer, userAPI userapi.UserInternalAPI, - accountsDB accounts.Database, rsAPI api.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, @@ -60,7 +58,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start notifier") } - requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB) + requestPool := sync.NewRequestPool(syncDB, notifier, userAPI) roomConsumer := consumers.NewOutputRoomEventConsumer( cfg, consumer, notifier, syncDB, rsAPI, diff --git a/userapi/api/api.go b/userapi/api/api.go index 3ed9252c..1578268a 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -14,13 +14,18 @@ package api -import "context" +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" +) // UserInternalAPI is the internal API for information about users and devices. type UserInternalAPI interface { QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error + QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error } // QueryAccessTokenRequest is the request for QueryAccessToken @@ -37,6 +42,22 @@ type QueryAccessTokenResponse struct { Err error // e.g ErrorForbidden } +// QueryAccountDataRequest is the request for QueryAccountData +type QueryAccountDataRequest struct { + UserID string // required: the user to get account data for. + // TODO: This is a terribly confusing API shape :/ + DataType string // optional: if specified returns only a single event matching this data type. + // optional: Only used if DataType is set. If blank returns global account data matching the data type. + // If set, returns only room account data matching this data type. + RoomID string +} + +// QueryAccountDataResponse is the response for QueryAccountData +type QueryAccountDataResponse struct { + GlobalAccountData []gomatrixserverlib.ClientEvent + RoomAccountData map[string][]gomatrixserverlib.ClientEvent +} + // QueryDevicesRequest is the request for QueryDevices type QueryDevicesRequest struct { UserID string diff --git a/userapi/internal/api.go b/userapi/internal/api.go index d8dec11a..6e737b81 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -73,6 +73,39 @@ func (a *UserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDevice return nil } +func (a *UserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error { + local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return err + } + if domain != a.ServerName { + return fmt.Errorf("cannot query account data of remote users: got %s want %s", domain, a.ServerName) + } + if req.DataType != "" { + var event *gomatrixserverlib.ClientEvent + event, err = a.AccountDB.GetAccountDataByType(ctx, local, req.RoomID, req.DataType) + if err != nil { + return err + } + if event != nil { + if req.RoomID != "" { + res.RoomAccountData = make(map[string][]gomatrixserverlib.ClientEvent) + res.RoomAccountData[req.RoomID] = []gomatrixserverlib.ClientEvent{*event} + } else { + res.GlobalAccountData = append(res.GlobalAccountData, *event) + } + } + return nil + } + global, rooms, err := a.AccountDB.GetAccountData(ctx, local) + if err != nil { + return err + } + res.RoomAccountData = rooms + res.GlobalAccountData = global + return nil +} + func (a *UserInternalAPI) QueryAccessToken(ctx context.Context, req *api.QueryAccessTokenRequest, res *api.QueryAccessTokenResponse) error { if req.AppServiceUserID != "" { appServiceDevice, err := a.queryAppServiceToken(ctx, req.AccessToken, req.AppServiceUserID) diff --git a/userapi/inthttp/client.go b/userapi/inthttp/client.go index 638a7e9b..48e6d7d7 100644 --- a/userapi/inthttp/client.go +++ b/userapi/inthttp/client.go @@ -29,6 +29,7 @@ const ( QueryProfilePath = "/userapi/queryProfile" QueryAccessTokenPath = "/userapi/queryAccessToken" QueryDevicesPath = "/userapi/queryDevices" + QueryAccountDataPath = "/userapi/queryAccountData" ) // NewUserAPIClient creates a UserInternalAPI implemented by talking to a HTTP POST API. @@ -82,3 +83,11 @@ func (h *httpUserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDe apiURL := h.apiURL + QueryDevicesPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) } + +func (h *httpUserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryAccountData") + defer span.Finish() + + apiURL := h.apiURL + QueryAccountDataPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) +} diff --git a/userapi/inthttp/server.go b/userapi/inthttp/server.go index 19b0e40b..8bf2efc0 100644 --- a/userapi/inthttp/server.go +++ b/userapi/inthttp/server.go @@ -64,4 +64,17 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(QueryAccountDataPath, + httputil.MakeInternalAPI("queryAccountData", func(req *http.Request) util.JSONResponse { + request := api.QueryAccountDataRequest{} + response := api.QueryAccountDataResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := s.QueryAccountData(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) }