diff --git a/currentstateserver/api/api.go b/currentstateserver/api/api.go index a25ba48e..2e068d95 100644 --- a/currentstateserver/api/api.go +++ b/currentstateserver/api/api.go @@ -25,18 +25,6 @@ type CurrentStateInternalAPI interface { QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error // QueryBulkStateContent does a bulk query for state event content in the given rooms. QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error - // QuerySharedUsers returns a list of users who share at least 1 room in common with the given user. - QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error -} - -type QuerySharedUsersRequest struct { - UserID string - ExcludeRoomIDs []string - IncludeRoomIDs []string -} - -type QuerySharedUsersResponse struct { - UserIDsToCount map[string]int } type QueryRoomsForUserRequest struct { diff --git a/currentstateserver/currentstateserver_test.go b/currentstateserver/currentstateserver_test.go deleted file mode 100644 index 4915b36c..00000000 --- a/currentstateserver/currentstateserver_test.go +++ /dev/null @@ -1,294 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package currentstateserver - -import ( - "context" - "crypto/ed25519" - "encoding/json" - "fmt" - "net/http" - "os" - "reflect" - "testing" - "time" - - "github.com/Shopify/sarama" - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/currentstateserver/api" - "github.com/matrix-org/dendrite/currentstateserver/internal" - "github.com/matrix-org/dendrite/currentstateserver/inthttp" - "github.com/matrix-org/dendrite/currentstateserver/storage" - "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/internal/test" - roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" -) - -var ( - testRoomVersion = gomatrixserverlib.RoomVersionV1 - testData = []json.RawMessage{ - []byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$0ok8ynDp7kjc95e3:kaer.morhen","hashes":{"sha256":"17kPoH+h0Dk4Omn7Sus0qMb6+oGcf+CZFEgDhv7UKWs"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"jP4a04f5/F10Pw95FPpdCyKAO44JOwUQ/MZOOeA/RTU1Dn+AHPMzGSaZnuGjRr/xQuADt+I3ctb5ZQfLKNzHDw"}},"state_key":"","type":"m.room.create"}`), - []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"content":{"membership":"join"},"depth":1,"event_id":"$LEwEu0kxrtu5fOiS:kaer.morhen","hashes":{"sha256":"B7M88PhXf3vd1LaFtjQutFu4x/w7fHD28XKZ4sAsJTo"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"p2vqmuJn7ZBRImctSaKbXCAxCcBlIjPH9JHte1ouIUGy84gpu4eLipOvSBCLL26hXfC0Zrm4WUto6Hr+ohdrCg"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`), - []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"join_rule":"public"},"depth":2,"event_id":"$SMHlqUrNhhBBRLeN:kaer.morhen","hashes":{"sha256":"vIuJQvmMjrGxshAkj1SXe0C4RqvMbv4ZADDw9pFCWqQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"hBMsb3Qppo3RaqqAl4JyTgaiWEbW5hlckATky6PrHun+F3YM203TzG7w9clwuQU5F5pZoB1a6nw+to0hN90FAw"}},"state_key":"","type":"m.room.join_rules"}`), - []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"history_visibility":"shared"},"depth":3,"event_id":"$6F1yGIbO0J7TM93h:kaer.morhen","hashes":{"sha256":"Mr23GKSlZW7UCCYLgOWawI2Sg6KIoMjUWO2TDenuOgw"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$SMHlqUrNhhBBRLeN:kaer.morhen",{"sha256":"SylzE8U02I+6eyEHgL+FlU0L5YdqrVp8OOlxKS9VQW0"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sHLKrFI3hKGrEJfpMVZSDS3LvLasQsy50CTsOwru9XTVxgRsPo6wozNtRVjxo1J3Rk18RC9JppovmQ5VR5EcDw"}},"state_key":"","type":"m.room.history_visibility"}`), - []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"ban":50,"events":null,"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":null,"users_default":0},"depth":4,"event_id":"$UKNe10XzYzG0TeA9:kaer.morhen","hashes":{"sha256":"ngbP3yja9U5dlckKerUs/fSOhtKxZMCVvsfhPURSS28"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6F1yGIbO0J7TM93h:kaer.morhen",{"sha256":"A4CucrKSoWX4IaJXhq02mBg1sxIyZEftbC+5p3fZAvk"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zOmwlP01QL3yFchzuR9WHvogOoBZA3oVtNIF3lM0ZfDnqlSYZB9sns27G/4HVq0k7alaK7ZE3oGoCrVnMkPNCw"}},"state_key":"","type":"m.room.power_levels"}`), - // messages - []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`), - []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":6,"event_id":"$MYSbs8m4rEbsCWXD:kaer.morhen","hashes":{"sha256":"kgbYM7v4Ud2YaBsjBTolM4ySg6rHcJNYI6nWhMSdFUA"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$gl2T9l3qm0kUbiIJ:kaer.morhen",{"sha256":"C/rD04h9wGxRdN2G/IBfrgoE1UovzLZ+uskwaKZ37/Q"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"x0UoKh968jj/F5l1/R7Ew0T6CTKuew3PLNHASNxqck/bkNe8yYQiDHXRr+kZxObeqPZZTpaF1+EI+bLU9W8GDQ"}},"type":"m.room.message"}`), - []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":7,"event_id":"$N5x9WJkl9ClPrAEg:kaer.morhen","hashes":{"sha256":"FWM8oz4yquTunRZ67qlW2gzPDzdWfBP6RPHXhK1I/x8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$MYSbs8m4rEbsCWXD:kaer.morhen",{"sha256":"fatqgW+SE8mb2wFn3UN+drmluoD4UJ/EcSrL6Ur9q1M"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"Y+LX/xcyufoXMOIoqQBNOzy6lZfUGB1ffgXIrSugk6obMiyAsiRejHQN/pciZXsHKxMJLYRFAz4zSJoS/LGPAA"}},"type":"m.room.message"}`), - } - testEvents = []gomatrixserverlib.HeaderedEvent{} - testStateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent) - - kafkaPrefix = "Dendrite" - kafkaTopic = fmt.Sprintf("%s%s", kafkaPrefix, "OutputRoomEvent") -) - -func init() { - for _, j := range testData { - e, err := gomatrixserverlib.NewEventFromTrustedJSON(j, false, testRoomVersion) - if err != nil { - panic("cannot load test data: " + err.Error()) - } - h := e.Headered(testRoomVersion) - testEvents = append(testEvents, h) - if e.StateKey() != nil { - testStateEvents[gomatrixserverlib.StateKeyTuple{ - EventType: e.Type(), - StateKey: *e.StateKey(), - }] = h - } - } -} - -func waitForOffsetProcessed(t *testing.T, db storage.Database, offset int64) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - for { - poffsets, err := db.PartitionOffsets(ctx, kafkaTopic) - if err != nil { - t.Fatalf("failed to PartitionOffsets: %s", err) - } - for _, partition := range poffsets { - if partition.Offset >= offset { - return - } - } - time.Sleep(50 * time.Millisecond) - } -} - -func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) int64 { - value, err := json.Marshal(roomserverAPI.OutputEvent{ - Type: roomserverAPI.OutputTypeNewRoomEvent, - NewRoomEvent: out, - }) - if err != nil { - t.Fatalf("failed to marshal output event: %s", err) - } - _, offset, err := producer.SendMessage(&sarama.ProducerMessage{ - Topic: kafkaTopic, - Key: sarama.StringEncoder(out.Event.RoomID()), - Value: sarama.ByteEncoder(value), - }) - if err != nil { - t.Fatalf("failed to send message: %s", err) - } - return offset -} - -func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, storage.Database, sarama.SyncProducer, func()) { - cfg := &config.Dendrite{} - cfg.Defaults() - stateDBName := "test_state.db" - naffkaDBName := "test_naffka.db" - cfg.Global.ServerName = "kaer.morhen" - cfg.CurrentStateServer.Database.ConnectionString = config.DataSource("file:" + stateDBName) - cfg.Global.Kafka.TopicPrefix = kafkaPrefix - naffkaDB, err := naffkaStorage.NewDatabase("file:" + naffkaDBName) - if err != nil { - t.Fatalf("Failed to setup naffka database: %s", err) - } - naff, err := naffka.New(naffkaDB) - if err != nil { - t.Fatalf("Failed to create naffka consumer: %s", err) - } - stateAPI := NewInternalAPI(&cfg.CurrentStateServer, naff) - // type-cast to pull out the DB - stateAPIVal := stateAPI.(*internal.CurrentStateInternalAPI) - return stateAPI, stateAPIVal.DB, naff, func() { - os.Remove(naffkaDBName) - os.Remove(stateDBName) - } -} - -func mustMakeMembershipEvent(t *testing.T, roomID, userID, membership string) *roomserverAPI.OutputNewRoomEvent { - eb := gomatrixserverlib.EventBuilder{ - RoomID: roomID, - Sender: userID, - StateKey: &userID, - Type: "m.room.member", - Content: []byte(`{"membership":"` + membership + `"}`), - } - _, pkey, err := ed25519.GenerateKey(nil) - if err != nil { - t.Fatalf("failed to make ed25519 key: %s", err) - } - roomVer := gomatrixserverlib.RoomVersionV5 - ev, err := eb.Build( - time.Now(), gomatrixserverlib.ServerName("localhost"), gomatrixserverlib.KeyID("ed25519:test"), - pkey, roomVer, - ) - if err != nil { - t.Fatalf("mustMakeMembershipEvent failed: %s", err) - } - - return &roomserverAPI.OutputNewRoomEvent{ - Event: ev.Headered(roomVer), - AddsStateEventIDs: []string{ev.EventID()}, - } -} - -// This test makes sure that QuerySharedUsers is returning the correct users for a range of sets. -func TestQuerySharedUsers(t *testing.T) { - currStateAPI, db, producer, cancel := MustMakeInternalAPI(t) - defer cancel() - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@alice:localhost", "join")) - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@bob:localhost", "join")) - - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo2:bar", "@alice:localhost", "join")) - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo2:bar", "@charlie:localhost", "join")) - - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@alice:localhost", "join")) - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@bob:localhost", "join")) - MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@dave:localhost", "leave")) - - offset := MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo4:bar", "@alice:localhost", "join")) - waitForOffsetProcessed(t, db, offset) - - testCases := []struct { - req api.QuerySharedUsersRequest - wantRes api.QuerySharedUsersResponse - }{ - // Simple case: sharing (A,B) (A,C) (A,B) (A) produces (A:4,B:2,C:1) - { - req: api.QuerySharedUsersRequest{ - UserID: "@alice:localhost", - }, - wantRes: api.QuerySharedUsersResponse{ - UserIDsToCount: map[string]int{ - "@alice:localhost": 4, - "@bob:localhost": 2, - "@charlie:localhost": 1, - }, - }, - }, - - // Exclude (A,C): sharing (A,B) (A,B) (A) produces (A:3,B:2) - { - req: api.QuerySharedUsersRequest{ - UserID: "@alice:localhost", - ExcludeRoomIDs: []string{"!foo2:bar"}, - }, - wantRes: api.QuerySharedUsersResponse{ - UserIDsToCount: map[string]int{ - "@alice:localhost": 3, - "@bob:localhost": 2, - }, - }, - }, - - // Unknown user has no shared users - { - req: api.QuerySharedUsersRequest{ - UserID: "@unknownuser:localhost", - }, - wantRes: api.QuerySharedUsersResponse{ - UserIDsToCount: map[string]int{}, - }, - }, - - // left real user produces no shared users - { - req: api.QuerySharedUsersRequest{ - UserID: "@dave:localhost", - }, - wantRes: api.QuerySharedUsersResponse{ - UserIDsToCount: map[string]int{}, - }, - }, - - // left real user but with included room returns the included room member - { - req: api.QuerySharedUsersRequest{ - UserID: "@dave:localhost", - IncludeRoomIDs: []string{"!foo:bar"}, - }, - wantRes: api.QuerySharedUsersResponse{ - UserIDsToCount: map[string]int{ - "@alice:localhost": 1, - "@bob:localhost": 1, - }, - }, - }, - - // including a room more than once doesn't double counts - { - req: api.QuerySharedUsersRequest{ - UserID: "@dave:localhost", - IncludeRoomIDs: []string{"!foo:bar", "!foo:bar", "!foo:bar"}, - }, - wantRes: api.QuerySharedUsersResponse{ - UserIDsToCount: map[string]int{ - "@alice:localhost": 1, - "@bob:localhost": 1, - }, - }, - }, - } - - runCases := func(testAPI api.CurrentStateInternalAPI) { - for _, tc := range testCases { - var res api.QuerySharedUsersResponse - err := testAPI.QuerySharedUsers(context.Background(), &tc.req, &res) - if err != nil { - t.Errorf("QuerySharedUsers returned error: %s", err) - continue - } - if !reflect.DeepEqual(res.UserIDsToCount, tc.wantRes.UserIDsToCount) { - t.Errorf("QuerySharedUsers got users %+v want %+v", res.UserIDsToCount, tc.wantRes.UserIDsToCount) - } - } - } - - t.Run("HTTP API", func(t *testing.T) { - router := mux.NewRouter().PathPrefix(httputil.InternalPathPrefix).Subrouter() - AddInternalRoutes(router, currStateAPI) - apiURL, cancel := test.ListenAndServe(t, router, false) - defer cancel() - httpAPI, err := inthttp.NewCurrentStateAPIClient(apiURL, &http.Client{}) - if err != nil { - t.Fatalf("failed to create HTTP client") - } - runCases(httpAPI) - }) - t.Run("Monolith", func(t *testing.T) { - runCases(currStateAPI) - }) -} diff --git a/currentstateserver/internal/api.go b/currentstateserver/internal/api.go index 2c065c8e..22e711d6 100644 --- a/currentstateserver/internal/api.go +++ b/currentstateserver/internal/api.go @@ -54,33 +54,3 @@ func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req } return nil } - -func (a *CurrentStateInternalAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error { - roomIDs, err := a.DB.GetRoomsByMembership(ctx, req.UserID, "join") - if err != nil { - return err - } - roomIDs = append(roomIDs, req.IncludeRoomIDs...) - excludeMap := make(map[string]bool) - for _, roomID := range req.ExcludeRoomIDs { - excludeMap[roomID] = true - } - // filter out excluded rooms - j := 0 - for i := range roomIDs { - // move elements to include to the beginning of the slice - // then trim elements on the right - if !excludeMap[roomIDs[i]] { - roomIDs[j] = roomIDs[i] - j++ - } - } - roomIDs = roomIDs[:j] - - users, err := a.DB.JoinedUsersSetInRooms(ctx, roomIDs) - if err != nil { - return err - } - res.UserIDsToCount = users - return nil -} diff --git a/currentstateserver/inthttp/client.go b/currentstateserver/inthttp/client.go index 91a3359f..bb853839 100644 --- a/currentstateserver/inthttp/client.go +++ b/currentstateserver/inthttp/client.go @@ -28,7 +28,6 @@ import ( const ( QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser" QueryBulkStateContentPath = "/currentstateserver/queryBulkStateContent" - QuerySharedUsersPath = "/currentstateserver/querySharedUsers" ) // NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API. @@ -74,13 +73,3 @@ func (h *httpCurrentStateInternalAPI) QueryBulkStateContent( apiURL := h.apiURL + QueryBulkStateContentPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } - -func (h *httpCurrentStateInternalAPI) QuerySharedUsers( - ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "QuerySharedUsers") - defer span.Finish() - - apiURL := h.apiURL + QuerySharedUsersPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) -} diff --git a/currentstateserver/inthttp/server.go b/currentstateserver/inthttp/server.go index 46e78f7f..9b1f511a 100644 --- a/currentstateserver/inthttp/server.go +++ b/currentstateserver/inthttp/server.go @@ -51,17 +51,4 @@ func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) - internalAPIMux.Handle(QuerySharedUsersPath, - httputil.MakeInternalAPI("querySharedUsers", func(req *http.Request) util.JSONResponse { - request := api.QuerySharedUsersRequest{} - response := api.QuerySharedUsersResponse{} - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := intAPI.QuerySharedUsers(req.Context(), &request, &response); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) } diff --git a/roomserver/storage/postgres/rooms_table.go b/roomserver/storage/postgres/rooms_table.go index ef1b7891..ce635210 100644 --- a/roomserver/storage/postgres/rooms_table.go +++ b/roomserver/storage/postgres/rooms_table.go @@ -79,10 +79,10 @@ const selectRoomIDsSQL = "" + "SELECT room_id FROM roomserver_rooms" const bulkSelectRoomIDsSQL = "" + - "SELECT room_id FROM roomserver_rooms WHERE room_nid IN ($1)" + "SELECT room_id FROM roomserver_rooms WHERE room_nid = ANY($1)" const bulkSelectRoomNIDsSQL = "" + - "SELECT room_nid FROM roomserver_rooms WHERE room_id IN ($1)" + "SELECT room_nid FROM roomserver_rooms WHERE room_id = ANY($1)" type roomStatements struct { insertRoomNIDStmt *sql.Stmt diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index a3b33a4f..7e1ccd16 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -774,15 +774,18 @@ func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership } stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, userID) if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } return nil, fmt.Errorf("GetRoomsByMembership: cannot map user ID to state key NID: %w", err) } roomNIDs, err := d.MembershipTable.SelectRoomsWithMembership(ctx, stateKeyNID, membershipState) if err != nil { - return nil, err + return nil, fmt.Errorf("GetRoomsByMembership: failed to SelectRoomsWithMembership: %w", err) } roomIDs, err := d.RoomsTable.BulkSelectRoomIDs(ctx, roomNIDs) if err != nil { - return nil, err + return nil, fmt.Errorf("GetRoomsByMembership: failed to lookup room nids: %w", err) } if len(roomIDs) != len(roomNIDs) { return nil, fmt.Errorf("GetRoomsByMembership: missing room IDs, got %d want %d", len(roomIDs), len(roomNIDs)) diff --git a/roomserver/storage/sqlite3/membership_table.go b/roomserver/storage/sqlite3/membership_table.go index 0d5ce516..bb1ab39a 100644 --- a/roomserver/storage/sqlite3/membership_table.go +++ b/roomserver/storage/sqlite3/membership_table.go @@ -41,7 +41,7 @@ const membershipSchema = ` ` var selectJoinedUsersSetForRoomsSQL = "" + - "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership WHERE room_nid = ANY($1) AND" + + "SELECT target_nid, COUNT(room_nid) FROM roomserver_membership WHERE room_nid IN ($1) AND" + " membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " GROUP BY target_nid" // Insert a row in to membership table so that it can be locked by the diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 93fa822d..33797378 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -23,6 +23,7 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" syncinternal "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" syncapi "github.com/matrix-org/dendrite/syncapi/sync" @@ -36,7 +37,8 @@ type OutputKeyChangeEventConsumer struct { keyChangeConsumer *internal.ContinualConsumer db storage.Database serverName gomatrixserverlib.ServerName // our server name - currentStateAPI currentstateAPI.CurrentStateInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI + stateAPI currentstateAPI.CurrentStateInternalAPI keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex @@ -51,7 +53,8 @@ func NewOutputKeyChangeEventConsumer( kafkaConsumer sarama.Consumer, n *syncapi.Notifier, keyAPI api.KeyInternalAPI, - currentStateAPI currentstateAPI.CurrentStateInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, ) *OutputKeyChangeEventConsumer { @@ -67,7 +70,8 @@ func NewOutputKeyChangeEventConsumer( db: store, serverName: serverName, keyAPI: keyAPI, - currentStateAPI: currentStateAPI, + rsAPI: rsAPI, + stateAPI: stateAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, notifier: n, @@ -105,8 +109,8 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // work out who we need to notify about the new key - var queryRes currentstateAPI.QuerySharedUsersResponse - err := s.currentStateAPI.QuerySharedUsers(context.Background(), ¤tstateAPI.QuerySharedUsersRequest{ + var queryRes roomserverAPI.QuerySharedUsersResponse + err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ UserID: output.UserID, }, &queryRes) if err != nil { @@ -115,7 +119,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ - syncinternal.DeviceListLogName: &types.LogPosition{ + syncinternal.DeviceListLogName: { Offset: msg.Offset, Partition: msg.Partition, }, @@ -129,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -142,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go index 7d127aa8..f2f50aef 100644 --- a/syncapi/internal/keychange.go +++ b/syncapi/internal/keychange.go @@ -22,6 +22,7 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/keyserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -48,7 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID, // be already filled in with join/leave information. // nolint:gocyclo func DeviceListCatchup( - ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, res *types.Response, from, to types.StreamingToken, ) (hasNew bool, err error) { @@ -56,7 +58,7 @@ func DeviceListCatchup( newlyJoinedRooms := joinedRooms(res, userID) newlyLeftRooms := leftRooms(res) if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) + changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) if err != nil { return false, err } @@ -97,7 +99,7 @@ func DeviceListCatchup( } // QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user. var sharedUsersMap map[string]int - sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, stateAPI, userID, queryRes.UserIDs) + sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs) util.GetLogger(ctx).Debugf( "QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v", partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs, @@ -142,7 +144,7 @@ func DeviceListCatchup( // TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. // nolint:gocyclo func TrackChangedUsers( - ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, + ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, ) (changed, left []string, err error) { // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. @@ -151,8 +153,8 @@ func TrackChangedUsers( // - Get users in newly left room. - QueryCurrentState // - Loop set of users and decrement by 1 for each user in newly left room. // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. - var queryRes currentstateAPI.QuerySharedUsersResponse - err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + var queryRes roomserverAPI.QuerySharedUsersResponse + err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: userID, IncludeRoomIDs: newlyLeftRooms, }, &queryRes) @@ -193,7 +195,7 @@ func TrackChangedUsers( // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? // - If yes: then they already shared a room in common, do nothing. // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' - err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: userID, ExcludeRoomIDs: newlyJoinedRooms, }, &queryRes) @@ -228,11 +230,11 @@ func TrackChangedUsers( } func filterSharedUsers( - ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, usersWithChangedKeys []string, + ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, usersWithChangedKeys []string, ) (map[string]int, []string) { var result []string - var sharedUsersRes currentstateAPI.QuerySharedUsersResponse - err := stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + var sharedUsersRes roomserverAPI.QuerySharedUsersResponse + err := rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: userID, }, &sharedUsersRes) if err != nil { diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go index 03ec4e96..ee6ca1f4 100644 --- a/syncapi/internal/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -7,8 +7,9 @@ import ( "testing" "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/currentstateserver/api" + stateapi "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -49,17 +50,18 @@ func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.Inpu } -type mockCurrentStateAPI struct { +type mockRoomserverAPI struct { + api.RoomserverInternalAPITrace roomIDToJoinedMembers map[string][]string } // QueryRoomsForUser retrieves a list of room IDs matching the given query. -func (s *mockCurrentStateAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { +func (s *mockRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { return nil } // QueryBulkStateContent does a bulk query for state event content in the given rooms. -func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error { +func (s *mockRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error { res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string) if req.AllowWildcards && len(req.StateTuples) == 1 && req.StateTuples[0].EventType == gomatrixserverlib.MRoomMember && req.StateTuples[0].StateKey == "*" { for _, roomID := range req.RoomIDs { @@ -76,7 +78,7 @@ func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *ap } // QuerySharedUsers returns a list of users who share at least 1 room in common with the given user. -func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error { +func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error { roomsToQuery := req.IncludeRoomIDs for roomID, members := range s.roomIDToJoinedMembers { exclude := false @@ -106,6 +108,30 @@ func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.Que return nil } +type mockStateAPI struct { + rsAPI *mockRoomserverAPI +} + +// QueryRoomsForUser retrieves a list of room IDs matching the given query. +func (s *mockStateAPI) QueryRoomsForUser(ctx context.Context, req *stateapi.QueryRoomsForUserRequest, res *stateapi.QueryRoomsForUserResponse) error { + return nil +} + +// QueryBulkStateContent does a bulk query for state event content in the given rooms. +func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error { + var res2 api.QueryBulkStateContentResponse + err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{ + RoomIDs: req.RoomIDs, + AllowWildcards: req.AllowWildcards, + StateTuples: req.StateTuples, + }, &res2) + if err != nil { + return err + } + res.Rooms = res2.Rooms + return nil +} + type wantCatchup struct { hasNew bool changed []string @@ -173,12 +199,13 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -195,12 +222,13 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {removeUser}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -217,12 +245,13 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -238,12 +267,13 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -297,11 +327,12 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {syncingUser, existingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } @@ -322,13 +353,14 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -407,12 +439,15 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + rsAPI := &mockRoomserverAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {newShareUser, newShareUser2}, "!another:room": {syncingUser}, }, - }, syncingUser, syncResponse, emptyToken, newestToken) + } + hasNew, err := DeviceListCatchup( + context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken, + ) if err != nil { t.Fatalf("DeviceListCatchup returned an error: %s", err) } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 357df240..2859da71 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" keyapi "github.com/matrix-org/dendrite/keyserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" @@ -40,15 +41,16 @@ type RequestPool struct { userAPI userapi.UserInternalAPI notifier *Notifier keyAPI keyapi.KeyInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI stateAPI currentstateAPI.CurrentStateInternalAPI } // NewRequestPool makes a new RequestPool func NewRequestPool( db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, - stateAPI currentstateAPI.CurrentStateInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, ) *RequestPool { - return &RequestPool{db, userAPI, n, keyAPI, stateAPI} + return &RequestPool{db, userAPI, n, keyAPI, rsAPI, stateAPI} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -265,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea func (rp *RequestPool) appendDeviceLists( data *types.Response, userID string, since, to types.StreamingToken, ) (*types.Response, error) { - _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to) + _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to) if err != nil { return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 0f4ea828..634bd1ee 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -62,11 +62,11 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start notifier") } - requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) + requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI, currentStateAPI) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, notifier, keyAPI, currentStateAPI, syncDB, + consumer, notifier, keyAPI, rsAPI, currentStateAPI, syncDB, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer")