Handle room transitions to 'leave' in incremental /sync requests (#104)
* Add test for transition to leave * Add test for join/leave in the same /sync responsemain
parent
675759c192
commit
0a3d44a80a
|
@ -296,6 +296,16 @@ func testSyncServer(syncServerCmdChan chan error, userID, since, want string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeToRoomServerLog(indexes ...int) {
|
||||||
|
var roomEvents []string
|
||||||
|
for _, i := range indexes {
|
||||||
|
roomEvents = append(roomEvents, outputRoomEventTestData[i])
|
||||||
|
}
|
||||||
|
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(roomEvents)); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Runs a battery of sync server tests against test data in testdata.go
|
// Runs a battery of sync server tests against test data in testdata.go
|
||||||
// testdata.go has a list of OutputRoomEvents which will be fed into the kafka log which the sync server will consume.
|
// testdata.go has a list of OutputRoomEvents which will be fed into the kafka log which the sync server will consume.
|
||||||
// The tests will pause at various points in this list to conduct tests on the /sync responses before continuing.
|
// The tests will pause at various points in this list to conduct tests on the /sync responses before continuing.
|
||||||
|
@ -311,9 +321,11 @@ func main() {
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 2"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 2"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 3"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello world 3"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
||||||
// $ curl -XPUT -d '{"name":"Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"name":"Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(outputRoomEventTestData[0:9])); err != nil {
|
writeToRoomServerLog(
|
||||||
panic(err)
|
i0StateRoomCreate, i1StateAliceJoin, i2StatePowerLevels, i3StateJoinRules, i4StateHistoryVisibility,
|
||||||
}
|
i5AliceMsg, i6AliceMsg, i7AliceMsg, i8StateAliceRoomName,
|
||||||
|
)
|
||||||
|
|
||||||
// Make sure initial sync works TODO: prev_batch
|
// Make sure initial sync works TODO: prev_batch
|
||||||
testSyncServer(syncServerCmdChan, "@alice:localhost", "", `{
|
testSyncServer(syncServerCmdChan, "@alice:localhost", "", `{
|
||||||
"account_data": {
|
"account_data": {
|
||||||
|
@ -338,15 +350,15 @@ func main() {
|
||||||
},
|
},
|
||||||
"timeline": {
|
"timeline": {
|
||||||
"events": [`+
|
"events": [`+
|
||||||
clientEventTestData[0]+","+
|
clientEventTestData[i0StateRoomCreate]+","+
|
||||||
clientEventTestData[1]+","+
|
clientEventTestData[i1StateAliceJoin]+","+
|
||||||
clientEventTestData[2]+","+
|
clientEventTestData[i2StatePowerLevels]+","+
|
||||||
clientEventTestData[3]+","+
|
clientEventTestData[i3StateJoinRules]+","+
|
||||||
clientEventTestData[4]+","+
|
clientEventTestData[i4StateHistoryVisibility]+","+
|
||||||
clientEventTestData[5]+","+
|
clientEventTestData[i5AliceMsg]+","+
|
||||||
clientEventTestData[6]+","+
|
clientEventTestData[i6AliceMsg]+","+
|
||||||
clientEventTestData[7]+","+
|
clientEventTestData[i7AliceMsg]+","+
|
||||||
clientEventTestData[8]+`],
|
clientEventTestData[i8StateAliceRoomName]+`],
|
||||||
"limited": true,
|
"limited": true,
|
||||||
"prev_batch": ""
|
"prev_batch": ""
|
||||||
}
|
}
|
||||||
|
@ -387,9 +399,7 @@ func main() {
|
||||||
}`)
|
}`)
|
||||||
|
|
||||||
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
|
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[9]})); err != nil {
|
writeToRoomServerLog(i9StateBobJoin)
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure alice sees it TODO: prev_batch
|
// Make sure alice sees it TODO: prev_batch
|
||||||
testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{
|
testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{
|
||||||
|
@ -416,7 +426,7 @@ func main() {
|
||||||
"timeline": {
|
"timeline": {
|
||||||
"limited": false,
|
"limited": false,
|
||||||
"prev_batch": "",
|
"prev_batch": "",
|
||||||
"events": [`+clientEventTestData[9]+`]
|
"events": [`+clientEventTestData[i9StateBobJoin]+`]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -445,18 +455,18 @@ func main() {
|
||||||
},
|
},
|
||||||
"state": {
|
"state": {
|
||||||
"events": [`+
|
"events": [`+
|
||||||
clientEventTestData[0]+","+
|
clientEventTestData[i0StateRoomCreate]+","+
|
||||||
clientEventTestData[1]+","+
|
clientEventTestData[i1StateAliceJoin]+","+
|
||||||
clientEventTestData[2]+","+
|
clientEventTestData[i2StatePowerLevels]+","+
|
||||||
clientEventTestData[3]+","+
|
clientEventTestData[i3StateJoinRules]+","+
|
||||||
clientEventTestData[4]+","+
|
clientEventTestData[i4StateHistoryVisibility]+","+
|
||||||
clientEventTestData[8]+`]
|
clientEventTestData[i8StateAliceRoomName]+`]
|
||||||
},
|
},
|
||||||
"timeline": {
|
"timeline": {
|
||||||
"limited": false,
|
"limited": false,
|
||||||
"prev_batch": "",
|
"prev_batch": "",
|
||||||
"events": [`+
|
"events": [`+
|
||||||
clientEventTestData[9]+`]
|
clientEventTestData[i9StateBobJoin]+`]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -465,9 +475,8 @@ func main() {
|
||||||
}`)
|
}`)
|
||||||
|
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost"
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[10]})); err != nil {
|
writeToRoomServerLog(i10BobMsg)
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
// Make sure alice can see everything around the join point for bob TODO: prev_batch
|
// Make sure alice can see everything around the join point for bob TODO: prev_batch
|
||||||
testSyncServer(syncServerCmdChan, "@alice:localhost", "7", `{
|
testSyncServer(syncServerCmdChan, "@alice:localhost", "7", `{
|
||||||
"account_data": {
|
"account_data": {
|
||||||
|
@ -494,10 +503,10 @@ func main() {
|
||||||
"limited": false,
|
"limited": false,
|
||||||
"prev_batch": "",
|
"prev_batch": "",
|
||||||
"events": [`+
|
"events": [`+
|
||||||
clientEventTestData[7]+","+
|
clientEventTestData[i7AliceMsg]+","+
|
||||||
clientEventTestData[8]+","+
|
clientEventTestData[i8StateAliceRoomName]+","+
|
||||||
clientEventTestData[9]+","+
|
clientEventTestData[i9StateBobJoin]+","+
|
||||||
clientEventTestData[10]+`]
|
clientEventTestData[i10BobMsg]+`]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -508,9 +517,7 @@ func main() {
|
||||||
// $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost"
|
||||||
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost"
|
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost"
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(outputRoomEventTestData[11:14])); err != nil {
|
writeToRoomServerLog(i11StateAliceRoomName, i12AliceMsg, i13StateBobInviteCharlie)
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure charlie sees the invite both with and without a ?since= token
|
// Make sure charlie sees the invite both with and without a ?since= token
|
||||||
// TODO: Invite state should include the invite event and the room name.
|
// TODO: Invite state should include the invite event and the room name.
|
||||||
|
@ -540,8 +547,90 @@ func main() {
|
||||||
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
|
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
||||||
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost"
|
||||||
|
writeToRoomServerLog(i14StateCharlieJoin, i15AliceMsg, i16StateAliceKickCharlie)
|
||||||
|
|
||||||
|
// Check transitions to leave work
|
||||||
|
testSyncServer(syncServerCmdChan, "@charlie:localhost", "15", `{
|
||||||
|
"account_data": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"next_batch": "17",
|
||||||
|
"presence": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"rooms": {
|
||||||
|
"invite": {},
|
||||||
|
"join": {},
|
||||||
|
"leave": {
|
||||||
|
"!PjrbIMW2cIiaYF4t:localhost": {
|
||||||
|
"state": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"timeline": {
|
||||||
|
"limited": false,
|
||||||
|
"prev_batch": "",
|
||||||
|
"events": [`+
|
||||||
|
clientEventTestData[i15AliceMsg]+","+
|
||||||
|
clientEventTestData[i16StateAliceKickCharlie]+`]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
|
// Test joining and leaving the same room in a single /sync request puts the room in the 'leave' section.
|
||||||
|
// TODO: Use an earlier since value to assert that the /sync response doesn't leak messages
|
||||||
|
// from before charlie was joined to the room. Currently it does leak because RecentEvents doesn't
|
||||||
|
// take membership into account.
|
||||||
|
testSyncServer(syncServerCmdChan, "@charlie:localhost", "14", `{
|
||||||
|
"account_data": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"next_batch": "17",
|
||||||
|
"presence": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"rooms": {
|
||||||
|
"invite": {},
|
||||||
|
"join": {},
|
||||||
|
"leave": {
|
||||||
|
"!PjrbIMW2cIiaYF4t:localhost": {
|
||||||
|
"state": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"timeline": {
|
||||||
|
"limited": false,
|
||||||
|
"prev_batch": "",
|
||||||
|
"events": [`+
|
||||||
|
clientEventTestData[i14StateCharlieJoin]+","+
|
||||||
|
clientEventTestData[i15AliceMsg]+","+
|
||||||
|
clientEventTestData[i16StateAliceKickCharlie]+`]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
|
||||||
// $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"
|
||||||
|
writeToRoomServerLog(i17BobMsg, i18StateAliceRoomName)
|
||||||
|
|
||||||
|
// Check that users don't see state changes in rooms after they have left
|
||||||
|
testSyncServer(syncServerCmdChan, "@charlie:localhost", "17", `{
|
||||||
|
"account_data": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"next_batch": "19",
|
||||||
|
"presence": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"rooms": {
|
||||||
|
"invite": {},
|
||||||
|
"join": {},
|
||||||
|
"leave": {}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"whatever"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"whatever"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
|
||||||
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
|
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@bob:localhost?access_token=@bob:localhost"
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"im alone now"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"im alone now"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
|
||||||
|
|
|
@ -14,6 +14,37 @@
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
|
const (
|
||||||
|
i0StateRoomCreate = iota
|
||||||
|
i1StateAliceJoin
|
||||||
|
i2StatePowerLevels
|
||||||
|
i3StateJoinRules
|
||||||
|
i4StateHistoryVisibility
|
||||||
|
i5AliceMsg
|
||||||
|
i6AliceMsg
|
||||||
|
i7AliceMsg
|
||||||
|
i8StateAliceRoomName
|
||||||
|
i9StateBobJoin
|
||||||
|
i10BobMsg
|
||||||
|
i11StateAliceRoomName
|
||||||
|
i12AliceMsg
|
||||||
|
i13StateBobInviteCharlie
|
||||||
|
i14StateCharlieJoin
|
||||||
|
i15AliceMsg
|
||||||
|
i16StateAliceKickCharlie
|
||||||
|
i17BobMsg
|
||||||
|
i18StateAliceRoomName
|
||||||
|
i19BobMsg
|
||||||
|
i20StateBobLeave
|
||||||
|
i21AliceMsg
|
||||||
|
i22StateAliceInviteBob
|
||||||
|
i23StateBobRejectInvite
|
||||||
|
i24AliceMsg
|
||||||
|
i25StateAliceRoomName
|
||||||
|
i26StateCharlieJoin
|
||||||
|
i27CharlieMsg
|
||||||
|
)
|
||||||
|
|
||||||
var outputRoomEventTestData = []string{
|
var outputRoomEventTestData = []string{
|
||||||
// $ curl -XPOST -d '{}' "http://localhost:8009/_matrix/client/r0/createRoom?access_token=@alice:localhost"
|
// $ curl -XPOST -d '{}' "http://localhost:8009/_matrix/client/r0/createRoom?access_token=@alice:localhost"
|
||||||
`{"Event":{"auth_events":[],"content":{"creator":"@alice:localhost"},"depth":1,"event_id":"$xz0fUB8zNMTGFh1W:localhost","hashes":{"sha256":"KKkpxS8NoH0igBbL3J+nJ39MRlmA7QgW4BGL7Fv4ASI"},"origin":"localhost","origin_server_ts":1494411218382,"prev_events":[],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"uZG5Q/Hs2Z611gFlZPdwomomRJKf70xV2FQV+gLWM1XgzkLDRlRF3cBZc9y3CnHKnV/upTcXs7Op2/GmgD3UBw"}},"state_key":"","type":"m.room.create"},"VisibilityEventIDs":null,"LatestEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"AddsStateEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":""}`,
|
`{"Event":{"auth_events":[],"content":{"creator":"@alice:localhost"},"depth":1,"event_id":"$xz0fUB8zNMTGFh1W:localhost","hashes":{"sha256":"KKkpxS8NoH0igBbL3J+nJ39MRlmA7QgW4BGL7Fv4ASI"},"origin":"localhost","origin_server_ts":1494411218382,"prev_events":[],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"uZG5Q/Hs2Z611gFlZPdwomomRJKf70xV2FQV+gLWM1XgzkLDRlRF3cBZc9y3CnHKnV/upTcXs7Op2/GmgD3UBw"}},"state_key":"","type":"m.room.create"},"VisibilityEventIDs":null,"LatestEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"AddsStateEventIDs":["$xz0fUB8zNMTGFh1W:localhost"],"RemovesStateEventIDs":null,"LastSentEventID":""}`,
|
||||||
|
|
|
@ -25,6 +25,12 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type stateDelta struct {
|
||||||
|
roomID string
|
||||||
|
stateEvents []gomatrixserverlib.Event
|
||||||
|
membership string
|
||||||
|
}
|
||||||
|
|
||||||
// SyncServerDatabase represents a sync server database
|
// SyncServerDatabase represents a sync server database
|
||||||
type SyncServerDatabase struct {
|
type SyncServerDatabase struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
@ -115,67 +121,44 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
|
||||||
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
||||||
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
||||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
// Work out which rooms to return in the response. This is done by getting not only the currently
|
||||||
if err != nil {
|
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
|
||||||
return err
|
// This works out what the 'state' key should be for each room as well as which membership block
|
||||||
}
|
// to put the room into.
|
||||||
|
deltas, err := d.getStateDeltas(txn, fromPos, toPos, userID)
|
||||||
state, err := d.events.StateBetween(txn, fromPos, toPos)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
res = types.NewResponse(toPos)
|
res = types.NewResponse(toPos)
|
||||||
|
for _, delta := range deltas {
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
recentEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, toPos, numRecentEventsPerRoom)
|
||||||
// - Get membership list changes for this user in this sync response
|
|
||||||
// - For each room which has membership list changes:
|
|
||||||
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
|
||||||
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
|
||||||
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
|
||||||
// * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block.
|
|
||||||
|
|
||||||
// work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed.
|
|
||||||
for roomID, stateEvents := range state {
|
|
||||||
for _, ev := range stateEvents {
|
|
||||||
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
|
||||||
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
|
||||||
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
|
||||||
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
|
||||||
// the timeline.
|
|
||||||
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
|
||||||
var memberContent events.MemberContent
|
|
||||||
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if memberContent.Membership != "join" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
allState, err := d.roomstate.CurrentState(txn, roomID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
state[roomID] = allState
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, roomID := range roomIDs {
|
|
||||||
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
state[roomID] = removeDuplicates(state[roomID], recentEvents)
|
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
||||||
|
|
||||||
jr := types.NewJoinResponse()
|
switch delta.membership {
|
||||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
case "join":
|
||||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
jr := types.NewJoinResponse()
|
||||||
jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Join[roomID] = *jr
|
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
|
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Join[delta.roomID] = *jr
|
||||||
|
case "leave":
|
||||||
|
fallthrough // transitions to leave are the same as ban
|
||||||
|
case "ban":
|
||||||
|
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||||
|
// no longer in the room.
|
||||||
|
lr := types.NewLeaveResponse()
|
||||||
|
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
|
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Leave[delta.roomID] = *lr
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: This should be done in getStateDeltas
|
||||||
return d.addInvitesToResponse(txn, userID, res)
|
return d.addInvitesToResponse(txn, userID, res)
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
@ -242,6 +225,67 @@ func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, re
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
|
||||||
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
|
// - Get membership list changes for this user in this sync response
|
||||||
|
// - For each room which has membership list changes:
|
||||||
|
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
||||||
|
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
||||||
|
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||||
|
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
||||||
|
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
||||||
|
var deltas []stateDelta
|
||||||
|
|
||||||
|
// get all the state events ever between these two positions
|
||||||
|
state, err := d.events.StateBetween(txn, fromPos, toPos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for roomID, stateEvents := range state {
|
||||||
|
for _, ev := range stateEvents {
|
||||||
|
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
||||||
|
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
||||||
|
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
||||||
|
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
||||||
|
// the timeline.
|
||||||
|
if membership := getMembershipFromEvent(&ev, userID); membership != "" {
|
||||||
|
if membership == "join" {
|
||||||
|
// send full room state down instead of a delta
|
||||||
|
var allState []gomatrixserverlib.Event
|
||||||
|
allState, err = d.roomstate.CurrentState(txn, roomID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
state[roomID] = allState
|
||||||
|
continue // we'll add this room in when we do joined rooms
|
||||||
|
}
|
||||||
|
|
||||||
|
deltas = append(deltas, stateDelta{
|
||||||
|
membership: membership,
|
||||||
|
stateEvents: stateEvents,
|
||||||
|
roomID: roomID,
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add in currently joined rooms
|
||||||
|
joinedRoomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, joinedRoomID := range joinedRoomIDs {
|
||||||
|
deltas = append(deltas, stateDelta{
|
||||||
|
membership: "join",
|
||||||
|
stateEvents: state[joinedRoomID],
|
||||||
|
roomID: joinedRoomID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return deltas, nil
|
||||||
|
}
|
||||||
|
|
||||||
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
||||||
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
||||||
// only, so clients get to the correct state once they have rolled forward.
|
// only, so clients get to the correct state once they have rolled forward.
|
||||||
|
@ -267,6 +311,19 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom
|
||||||
return stateEvents
|
return stateEvents
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
|
||||||
|
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
|
||||||
|
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
|
||||||
|
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
||||||
|
var memberContent events.MemberContent
|
||||||
|
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return memberContent.Membership
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||||
txn, err := db.Begin()
|
txn, err := db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue