diff --git a/keyserver/api/api.go b/keyserver/api/api.go index d42fb60c..406a252d 100644 --- a/keyserver/api/api.go +++ b/keyserver/api/api.go @@ -26,6 +26,7 @@ type KeyInternalAPI interface { // PerformClaimKeys claims one-time keys for use in pre-key messages PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse) QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse) + QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse) } // KeyError is returned if there was a problem performing/querying the server @@ -131,3 +132,19 @@ type QueryKeysResponse struct { // Set if there was a fatal error processing this query Error *KeyError } + +type QueryKeyChangesRequest struct { + // The partition which had key events sent to + Partition int32 + // The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning + Offset int64 +} + +type QueryKeyChangesResponse struct { + // The set of users who have had their keys change. + UserIDs []string + // The latest offset represented in this response. + Offset int64 + // Set if there was a problem handling the request. + Error *KeyError +} diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index d3a6d4ba..240a5640 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -40,6 +40,17 @@ type KeyInternalAPI struct { Producer *producers.KeyChange } +func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) { + userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset) + if err != nil { + res.Error = &api.KeyError{ + Err: err.Error(), + } + } + res.Offset = latest + res.UserIDs = userIDs +} + func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) { res.KeyErrors = make(map[string]map[string]*api.KeyError) a.uploadDeviceKeys(ctx, req, res) diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go index 4c0f1e53..cd9cf70d 100644 --- a/keyserver/inthttp/client.go +++ b/keyserver/inthttp/client.go @@ -29,6 +29,7 @@ const ( PerformUploadKeysPath = "/keyserver/performUploadKeys" PerformClaimKeysPath = "/keyserver/performClaimKeys" QueryKeysPath = "/keyserver/queryKeys" + QueryKeyChangesPath = "/keyserver/queryKeyChanges" ) // NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API. @@ -101,3 +102,20 @@ func (h *httpKeyInternalAPI) QueryKeys( } } } + +func (h *httpKeyInternalAPI) QueryKeyChanges( + ctx context.Context, + request *api.QueryKeyChangesRequest, + response *api.QueryKeyChangesResponse, +) { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeyChanges") + defer span.Finish() + + apiURL := h.apiURL + QueryKeyChangesPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) + if err != nil { + response.Error = &api.KeyError{ + Err: err.Error(), + } + } +} diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go index ec78b613..f3d2882c 100644 --- a/keyserver/inthttp/server.go +++ b/keyserver/inthttp/server.go @@ -58,4 +58,15 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle(QueryKeyChangesPath, + httputil.MakeInternalAPI("queryKeyChanges", func(req *http.Request) util.JSONResponse { + request := api.QueryKeyChangesRequest{} + response := api.QueryKeyChangesResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + s.QueryKeyChanges(req.Context(), &request, &response) + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 4a1c7309..78aff601 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -26,16 +26,17 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) // OutputKeyChangeEventConsumer consumes events that originated in the key server. type OutputKeyChangeEventConsumer struct { - keyChangeConsumer *internal.ContinualConsumer - db storage.Database - serverName gomatrixserverlib.ServerName // our server name - currentStateAPI currentstateAPI.CurrentStateInternalAPI - // keyAPI api.KeyInternalAPI + keyChangeConsumer *internal.ContinualConsumer + db storage.Database + serverName gomatrixserverlib.ServerName // our server name + currentStateAPI currentstateAPI.CurrentStateInternalAPI + keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex } @@ -46,6 +47,7 @@ func NewOutputKeyChangeEventConsumer( serverName gomatrixserverlib.ServerName, topic string, kafkaConsumer sarama.Consumer, + keyAPI api.KeyInternalAPI, currentStateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, ) *OutputKeyChangeEventConsumer { @@ -60,6 +62,7 @@ func NewOutputKeyChangeEventConsumer( keyChangeConsumer: &consumer, db: store, serverName: serverName, + keyAPI: keyAPI, currentStateAPI: currentStateAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, @@ -115,21 +118,44 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er // be already filled in with join/leave information. func (s *OutputKeyChangeEventConsumer) Catchup( ctx context.Context, userID string, res *types.Response, tok types.StreamingToken, -) (hasNew bool, err error) { +) (newTok *types.StreamingToken, hasNew bool, err error) { // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. newlyJoinedRooms := joinedRooms(res, userID) newlyLeftRooms := leftRooms(res) if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms) if err != nil { - return false, err + return nil, false, err } res.DeviceLists.Changed = changed res.DeviceLists.Left = left hasNew = len(changed) > 0 || len(left) > 0 } - // TODO: now also track users who we already share rooms with but who have updated their devices between the two tokens + // now also track users who we already share rooms with but who have updated their devices between the two tokens + // TODO: Extract partition/offset from sync token + var partition int32 + var offset int64 + var queryRes api.QueryKeyChangesResponse + s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ + Partition: partition, + Offset: offset, + }, &queryRes) + if queryRes.Error != nil { + // don't fail the catchup because we may have got useful information by tracking membership + util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") + } else { + // TODO: Make a new streaming token using the new offset + userSet := make(map[string]bool) + for _, userID := range res.DeviceLists.Changed { + userSet[userID] = true + } + for _, userID := range queryRes.UserIDs { + if !userSet[userID] { + res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) + } + } + } return } diff --git a/syncapi/consumers/keychange_test.go b/syncapi/consumers/keychange_test.go index 7322e208..f8e96570 100644 --- a/syncapi/consumers/keychange_test.go +++ b/syncapi/consumers/keychange_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/matrix-org/dendrite/currentstateserver/api" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -15,6 +16,19 @@ var ( syncingUser = "@alice:localhost" ) +type mockKeyAPI struct{} + +func (k *mockKeyAPI) PerformUploadKeys(ctx context.Context, req *keyapi.PerformUploadKeysRequest, res *keyapi.PerformUploadKeysResponse) { +} + +// PerformClaimKeys claims one-time keys for use in pre-key messages +func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) { +} +func (k *mockKeyAPI) QueryKeys(ctx context.Context, req *keyapi.QueryKeysRequest, res *keyapi.QueryKeysResponse) { +} +func (k *mockKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyChangesRequest, res *keyapi.QueryKeyChangesResponse) { +} + type mockCurrentStateAPI struct { roomIDToJoinedMembers map[string][]string } @@ -144,7 +158,7 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { newShareUser := "@bill:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser}, "!another:room": {syncingUser}, @@ -153,7 +167,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -167,7 +181,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { removeUser := "@bill:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {removeUser}, "!another:room": {syncingUser}, @@ -176,7 +190,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -190,7 +204,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { existingUser := "@bob:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser}, @@ -199,7 +213,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -212,7 +226,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { existingUser := "@bob:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {existingUser}, "!another:room": {syncingUser, existingUser}, @@ -221,7 +235,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { syncResponse := types.NewResponse() syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -234,7 +248,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { existingUser := "@bob1:localhost" roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {syncingUser, existingUser}, }, @@ -280,7 +294,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -297,7 +311,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { newlyLeftUser2 := "@debra:localhost" newlyJoinedRoom := "!join:bar" newlyLeftRoom := "!left:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, @@ -308,7 +322,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -333,7 +347,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { newShareUser := "@berta:localhost" newShareUser2 := "@bobby:localhost" roomID := "!join:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ roomID: {newShareUser, newShareUser2}, "!another:room": {syncingUser}, @@ -393,7 +407,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) if err != nil { t.Fatalf("Catchup returned an error: %s", err) }