diff --git a/internal/consumers.go b/internal/consumers.go index d7917f23..c000c171 100644 --- a/internal/consumers.go +++ b/internal/consumers.go @@ -58,11 +58,17 @@ var ErrShutdown = fmt.Errorf("shutdown") // Returns nil once all the goroutines are started. // Returns an error if it can't start consuming for any of the partitions. func (c *ContinualConsumer) Start() error { + _, err := c.StartOffsets() + return err +} + +// StartOffsets is the same as Start but returns the loaded offsets as well. +func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error) { offsets := map[int32]int64{} partitions, err := c.Consumer.Partitions(c.Topic) if err != nil { - return err + return nil, err } for _, partition := range partitions { // Default all the offsets to the beginning of the stream. @@ -71,7 +77,7 @@ func (c *ContinualConsumer) Start() error { storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic) if err != nil { - return err + return nil, err } for _, offset := range storedOffsets { // We've already processed events from this partition so advance the offset to where we got to. @@ -87,7 +93,7 @@ func (c *ContinualConsumer) Start() error { for _, p := range partitionConsumers { p.Close() // nolint: errcheck } - return err + return nil, err } partitionConsumers = append(partitionConsumers, pc) } @@ -95,7 +101,7 @@ func (c *ContinualConsumer) Start() error { go c.consumePartition(pc) } - return nil + return storedOffsets, nil } // consumePartition consumes the room events for a single partition of the kafkaesque stream. diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 23961452..4a1c7309 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -17,13 +17,14 @@ package consumers import ( "context" "encoding/json" + "sync" "github.com/Shopify/sarama" currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) @@ -35,28 +36,33 @@ type OutputKeyChangeEventConsumer struct { serverName gomatrixserverlib.ServerName // our server name currentStateAPI currentstateAPI.CurrentStateInternalAPI // keyAPI api.KeyInternalAPI + partitionToOffset map[int32]int64 + partitionToOffsetMu sync.Mutex } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. // Call Start() to begin consuming from the key server. func NewOutputKeyChangeEventConsumer( - cfg *config.Dendrite, + serverName gomatrixserverlib.ServerName, + topic string, kafkaConsumer sarama.Consumer, currentStateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, ) *OutputKeyChangeEventConsumer { consumer := internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent), + Topic: topic, Consumer: kafkaConsumer, PartitionStore: store, } s := &OutputKeyChangeEventConsumer{ - keyChangeConsumer: &consumer, - db: store, - serverName: cfg.Matrix.ServerName, - currentStateAPI: currentStateAPI, + keyChangeConsumer: &consumer, + db: store, + serverName: serverName, + currentStateAPI: currentStateAPI, + partitionToOffset: make(map[int32]int64), + partitionToOffsetMu: sync.Mutex{}, } consumer.ProcessMessage = s.onMessage @@ -66,10 +72,25 @@ func NewOutputKeyChangeEventConsumer( // Start consuming from the key server func (s *OutputKeyChangeEventConsumer) Start() error { - return s.keyChangeConsumer.Start() + offsets, err := s.keyChangeConsumer.StartOffsets() + s.partitionToOffsetMu.Lock() + for _, o := range offsets { + s.partitionToOffset[o.Partition] = o.Offset + } + s.partitionToOffsetMu.Unlock() + return err +} + +func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage) { + s.partitionToOffsetMu.Lock() + defer s.partitionToOffsetMu.Unlock() + s.partitionToOffset[msg.Partition] = msg.Offset } func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + defer func() { + s.updateOffset(msg) + }() var output api.DeviceKeys if err := json.Unmarshal(msg.Value, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream @@ -78,18 +99,190 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } // work out who we need to notify about the new key var queryRes currentstateAPI.QuerySharedUsersResponse - err := s.currentStateAPI.QuerySharedUsers(context.Background(), ¤tstateAPI.QuerySharedUsersRequest{}, &queryRes) + err := s.currentStateAPI.QuerySharedUsers(context.Background(), ¤tstateAPI.QuerySharedUsersRequest{ + UserID: output.UserID, + }, &queryRes) if err != nil { log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server") return err } - // TODO: notify users by waking up streams + // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams return nil } -// Catchup returns a list of user IDs of users who have changed their device keys between the partition|offset given and now. -// Returns the new offset for this partition. -func (s *OutputKeyChangeEventConsumer) Catchup(parition int32, offset int64) (userIDs []string, newOffset int, err error) { - //return s.keyAPI.QueryKeyChangeCatchup(ctx, partition, offset) +// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response +// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST +// be already filled in with join/leave information. +func (s *OutputKeyChangeEventConsumer) Catchup( + ctx context.Context, userID string, res *types.Response, tok types.StreamingToken, +) (hasNew bool, err error) { + // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. + newlyJoinedRooms := joinedRooms(res, userID) + newlyLeftRooms := leftRooms(res) + if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { + changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms) + if err != nil { + return false, err + } + res.DeviceLists.Changed = changed + res.DeviceLists.Left = left + hasNew = len(changed) > 0 || len(left) > 0 + } + + // TODO: now also track users who we already share rooms with but who have updated their devices between the two tokens return } + +func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { + // work out who we are now sharing rooms with which we previously were not and notify them about the joining + // users keys: + changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) + if err != nil { + log.WithError(err).Error("OnJoinEvent: failed to work out changed users") + return + } + // TODO: f.e changed, wake up stream + for _, userID := range changed { + log.Infof("OnJoinEvent:Notify %s that %s should have device lists tracked", userID, *ev.StateKey()) + } +} + +func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { + // work out who we are no longer sharing any rooms with and notify them about the leaving user + _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) + if err != nil { + log.WithError(err).Error("OnLeaveEvent: failed to work out left users") + return + } + // TODO: f.e left, wake up stream + for _, userID := range left { + log.Infof("OnLeaveEvent:Notify %s that %s should no longer track device lists", userID, *ev.StateKey()) + } + +} + +// nolint:gocyclo +func (s *OutputKeyChangeEventConsumer) trackChangedUsers( + ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string, +) (changed, left []string, err error) { + // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. + + // Leave algorithm: + // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. + // - Get users in newly left room. - QueryCurrentState + // - Loop set of users and decrement by 1 for each user in newly left room. + // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. + var queryRes currentstateAPI.QuerySharedUsersResponse + err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + IncludeRoomIDs: newlyLeftRooms, + }, &queryRes) + if err != nil { + return nil, nil, err + } + var stateRes currentstateAPI.QueryBulkStateContentResponse + err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyLeftRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, nil, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + queryRes.UserIDsToCount[tuple.StateKey]-- + } + } + for userID, count := range queryRes.UserIDsToCount { + if count <= 0 { + left = append(left, userID) // left is returned + } + } + + // Join algorithm: + // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. + // - Get users in newly joined room - QueryCurrentState + // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? + // - If yes: then they already shared a room in common, do nothing. + // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' + err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + ExcludeRoomIDs: newlyJoinedRooms, + }, &queryRes) + if err != nil { + return nil, left, err + } + err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyJoinedRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, left, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + // new user who we weren't previously sharing rooms with + if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { + changed = append(changed, tuple.StateKey) // changed is returned + } + } + } + return changed, left, nil +} + +func joinedRooms(res *types.Response, userID string) []string { + var roomIDs []string + for roomID, join := range res.Rooms.Join { + // we would expect to see our join event somewhere if we newly joined the room. + // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. + newlyJoined := membershipEventPresent(join.State.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + continue + } + newlyJoined = membershipEventPresent(join.Timeline.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + } + } + return roomIDs +} + +func leftRooms(res *types.Response) []string { + roomIDs := make([]string, len(res.Rooms.Leave)) + i := 0 + for roomID := range res.Rooms.Leave { + roomIDs[i] = roomID + i++ + } + return roomIDs +} + +func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { + for _, ev := range events { + // it's enough to know that we have our member event here, don't need to check membership content + // as it's implied by being in the respective section of the sync response. + if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { + return true + } + } + return false +} diff --git a/syncapi/consumers/keychange_test.go b/syncapi/consumers/keychange_test.go new file mode 100644 index 00000000..9e7ede1f --- /dev/null +++ b/syncapi/consumers/keychange_test.go @@ -0,0 +1,400 @@ +package consumers + +import ( + "context" + "reflect" + "sort" + "testing" + + "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +var ( + syncingUser = "@alice:localhost" +) + +type mockCurrentStateAPI struct { + roomIDToJoinedMembers map[string][]string +} + +func (s *mockCurrentStateAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error { + return nil +} + +// QueryRoomsForUser retrieves a list of room IDs matching the given query. +func (s *mockCurrentStateAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error { + return nil +} + +// QueryBulkStateContent does a bulk query for state event content in the given rooms. +func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error { + res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string) + if req.AllowWildcards && len(req.StateTuples) == 1 && req.StateTuples[0].EventType == gomatrixserverlib.MRoomMember && req.StateTuples[0].StateKey == "*" { + for _, roomID := range req.RoomIDs { + res.Rooms[roomID] = make(map[gomatrixserverlib.StateKeyTuple]string) + for _, userID := range s.roomIDToJoinedMembers[roomID] { + res.Rooms[roomID][gomatrixserverlib.StateKeyTuple{ + EventType: gomatrixserverlib.MRoomMember, + StateKey: userID, + }] = "join" + } + } + } + return nil +} + +// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user. +func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error { + roomsToQuery := req.IncludeRoomIDs + for roomID, members := range s.roomIDToJoinedMembers { + exclude := false + for _, excludeRoomID := range req.ExcludeRoomIDs { + if roomID == excludeRoomID { + exclude = true + break + } + } + if exclude { + continue + } + for _, userID := range members { + if userID == req.UserID { + roomsToQuery = append(roomsToQuery, roomID) + break + } + } + } + + res.UserIDsToCount = make(map[string]int) + for _, roomID := range roomsToQuery { + for _, userID := range s.roomIDToJoinedMembers[roomID] { + res.UserIDsToCount[userID]++ + } + } + return nil +} + +type wantCatchup struct { + hasNew bool + changed []string + left []string +} + +func assertCatchup(t *testing.T, hasNew bool, syncResponse *types.Response, want wantCatchup) { + if hasNew != want.hasNew { + t.Errorf("got hasNew=%v want %v", hasNew, want.hasNew) + } + sort.Strings(syncResponse.DeviceLists.Left) + if !reflect.DeepEqual(syncResponse.DeviceLists.Left, want.left) { + t.Errorf("device_lists.left got %v want %v", syncResponse.DeviceLists.Left, want.left) + } + sort.Strings(syncResponse.DeviceLists.Changed) + if !reflect.DeepEqual(syncResponse.DeviceLists.Changed, want.changed) { + t.Errorf("device_lists.changed got %v want %v", syncResponse.DeviceLists.Changed, want.changed) + } +} + +func joinResponseWithRooms(syncResponse *types.Response, userID string, roomIDs []string) *types.Response { + for _, roomID := range roomIDs { + roomEvents := []gomatrixserverlib.ClientEvent{ + { + Type: "m.room.member", + StateKey: &userID, + EventID: "$something:here", + Sender: userID, + RoomID: roomID, + Content: []byte(`{"membership":"join"}`), + }, + } + + jr := syncResponse.Rooms.Join[roomID] + jr.State.Events = roomEvents + syncResponse.Rooms.Join[roomID] = jr + } + return syncResponse +} + +func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs []string) *types.Response { + for _, roomID := range roomIDs { + roomEvents := []gomatrixserverlib.ClientEvent{ + { + Type: "m.room.member", + StateKey: &userID, + EventID: "$something:here", + Sender: userID, + RoomID: roomID, + Content: []byte(`{"membership":"leave"}`), + }, + } + + lr := syncResponse.Rooms.Leave[roomID] + lr.Timeline.Events = roomEvents + syncResponse.Rooms.Leave[roomID] = lr + } + return syncResponse +} + +// tests that joining a room which results in sharing a new user includes that user in `changed` +func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { + newShareUser := "@bill:localhost" + newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar" + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + newlyJoinedRoom: {syncingUser, newShareUser}, + "!another:room": {syncingUser}, + }, + }, nil) + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + if err != nil { + t.Fatalf("Catchup returned an error: %s", err) + } + assertCatchup(t, hasNew, syncResponse, wantCatchup{ + hasNew: true, + changed: []string{newShareUser}, + }) +} + +// tests that leaving a room which results in sharing no rooms with a user includes that user in `left` +func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { + removeUser := "@bill:localhost" + newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar" + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + newlyLeftRoom: {removeUser}, + "!another:room": {syncingUser}, + }, + }, nil) + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + if err != nil { + t.Fatalf("Catchup returned an error: %s", err) + } + assertCatchup(t, hasNew, syncResponse, wantCatchup{ + hasNew: true, + left: []string{removeUser}, + }) +} + +// tests that joining a room which doesn't result in sharing a new user results in no changes. +func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { + existingUser := "@bob:localhost" + newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar" + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + newlyJoinedRoom: {syncingUser, existingUser}, + "!another:room": {syncingUser, existingUser}, + }, + }, nil) + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + if err != nil { + t.Fatalf("Catchup returned an error: %s", err) + } + assertCatchup(t, hasNew, syncResponse, wantCatchup{ + hasNew: false, + }) +} + +// tests that leaving a room which doesn't result in sharing no rooms with a user results in no changes. +func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { + existingUser := "@bob:localhost" + newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar" + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + newlyLeftRoom: {existingUser}, + "!another:room": {syncingUser, existingUser}, + }, + }, nil) + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + if err != nil { + t.Fatalf("Catchup returned an error: %s", err) + } + assertCatchup(t, hasNew, syncResponse, wantCatchup{ + hasNew: false, + }) +} + +// tests that not joining any rooms (but having messages in the response) do not result in changes. +func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { + existingUser := "@bob1:localhost" + roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar" + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {syncingUser, existingUser}, + }, + }, nil) + syncResponse := types.NewResponse() + empty := "" + roomStateEvents := []gomatrixserverlib.ClientEvent{ + { + Type: "m.room.name", + StateKey: &empty, + EventID: "$something:here", + Sender: existingUser, + RoomID: roomID, + Content: []byte(`{"name":"The Room Name"}`), + }, + } + roomTimelineEvents := []gomatrixserverlib.ClientEvent{ + { + Type: "m.room.message", + EventID: "$something1:here", + Sender: existingUser, + RoomID: roomID, + Content: []byte(`{"body":"Message 1"}`), + }, + { + Type: "m.room.message", + EventID: "$something2:here", + Sender: syncingUser, + RoomID: roomID, + Content: []byte(`{"body":"Message 2"}`), + }, + { + Type: "m.room.message", + EventID: "$something3:here", + Sender: existingUser, + RoomID: roomID, + Content: []byte(`{"body":"Message 3"}`), + }, + } + + jr := syncResponse.Rooms.Join[roomID] + jr.State.Events = roomStateEvents + jr.Timeline.Events = roomTimelineEvents + syncResponse.Rooms.Join[roomID] = jr + + hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + if err != nil { + t.Fatalf("Catchup returned an error: %s", err) + } + assertCatchup(t, hasNew, syncResponse, wantCatchup{ + hasNew: false, + }) +} + +// tests that joining/leaving multiple rooms can result in both `changed` and `left` and they are not duplicated. +func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { + newShareUser := "@berta:localhost" + newShareUser2 := "@bobby:localhost" + newlyLeftUser := "@charlie:localhost" + newlyLeftUser2 := "@debra:localhost" + newlyJoinedRoom := "!join:bar" + newlyLeftRoom := "!left:bar" + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, + newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, + "!another:room": {syncingUser}, + }, + }, nil) + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + if err != nil { + t.Fatalf("Catchup returned an error: %s", err) + } + assertCatchup(t, hasNew, syncResponse, wantCatchup{ + hasNew: true, + changed: []string{newShareUser, newShareUser2}, + left: []string{newlyLeftUser, newlyLeftUser2}, + }) +} + +// tests that joining/leaving the SAME room puts users in `left` if the final state is leave. +// NB: Consider the case: +// - Alice and Bob are in a room. +// - Alice goes offline, Charlie joins, sends encrypted messages then leaves the room. +// - Alice comes back online. Technically nothing has changed in the set of users between those two points in time, +// it's still just (Alice,Bob) but then we won't be tracking Charlie -- is this okay though? It's device keys +// which are only relevant when actively sending events I think? And if Alice does need the keys she knows +// charlie's (user_id, device_id) so can just hit /keys/query - no need to keep updated about it because she +// doesn't share any rooms with him. +// Ergo, we put them in `left` as it is simpler. +func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { + newShareUser := "@berta:localhost" + newShareUser2 := "@bobby:localhost" + roomID := "!join:bar" + consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {newShareUser, newShareUser2}, + "!another:room": {syncingUser}, + }, + }, nil) + syncResponse := types.NewResponse() + roomEvents := []gomatrixserverlib.ClientEvent{ + { + Type: "m.room.member", + StateKey: &syncingUser, + EventID: "$something:here", + Sender: syncingUser, + RoomID: roomID, + Content: []byte(`{"membership":"join"}`), + }, + { + Type: "m.room.message", + EventID: "$something2:here", + Sender: syncingUser, + RoomID: roomID, + Content: []byte(`{"body":"now I leave you"}`), + }, + { + Type: "m.room.member", + StateKey: &syncingUser, + EventID: "$something3:here", + Sender: syncingUser, + RoomID: roomID, + Content: []byte(`{"membership":"leave"}`), + }, + { + Type: "m.room.member", + StateKey: &syncingUser, + EventID: "$something4:here", + Sender: syncingUser, + RoomID: roomID, + Content: []byte(`{"membership":"join"}`), + }, + { + Type: "m.room.message", + EventID: "$something5:here", + Sender: syncingUser, + RoomID: roomID, + Content: []byte(`{"body":"now I am back, and I leave you for good"}`), + }, + { + Type: "m.room.member", + StateKey: &syncingUser, + EventID: "$something6:here", + Sender: syncingUser, + RoomID: roomID, + Content: []byte(`{"membership":"leave"}`), + }, + } + + lr := syncResponse.Rooms.Leave[roomID] + lr.Timeline.Events = roomEvents + syncResponse.Rooms.Leave[roomID] = lr + + hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0)) + if err != nil { + t.Fatalf("Catchup returned an error: %s", err) + } + assertCatchup(t, hasNew, syncResponse, wantCatchup{ + hasNew: true, + left: []string{newShareUser, newShareUser2}, + }) +} diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 019f2e69..7dc02281 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -302,6 +302,10 @@ type Response struct { ToDevice struct { Events []gomatrixserverlib.SendToDeviceEvent `json:"events"` } `json:"to_device"` + DeviceLists struct { + Changed []string `json:"changed,omitempty"` + Left []string `json:"left,omitempty"` + } `json:"device_lists,omitempty"` } // NewResponse creates an empty response with initialised maps.