Send the entire room state down when transitioning to 'join' on a /sync response (#100)
This is only 'mostly' correct currently, because what should be no-op dupe joins will actually trigger the entire room state to be re-sent. Bizarrely, it's significantly easier to just do that than work out if we should, and there are no client-visible effects to doing so, so we just do it for now.main
parent
2a9abefd92
commit
cc7117392f
|
@ -392,7 +392,6 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure alice sees it TODO: prev_batch
|
// Make sure alice sees it TODO: prev_batch
|
||||||
// TODO: Make sure bob sees it AND all the current room state
|
|
||||||
testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{
|
testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{
|
||||||
"account_data": {
|
"account_data": {
|
||||||
"events": []
|
"events": []
|
||||||
|
@ -425,6 +424,46 @@ func main() {
|
||||||
}
|
}
|
||||||
}`)
|
}`)
|
||||||
|
|
||||||
|
// Make sure bob sees the room AND all the current room state TODO: history visibility
|
||||||
|
testSyncServer(syncServerCmdChan, "@bob:localhost", "9", `{
|
||||||
|
"account_data": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"next_batch": "10",
|
||||||
|
"presence": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"rooms": {
|
||||||
|
"invite": {},
|
||||||
|
"join": {
|
||||||
|
"!PjrbIMW2cIiaYF4t:localhost": {
|
||||||
|
"account_data": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"ephemeral": {
|
||||||
|
"events": []
|
||||||
|
},
|
||||||
|
"state": {
|
||||||
|
"events": [`+
|
||||||
|
clientEventTestData[0]+","+
|
||||||
|
clientEventTestData[1]+","+
|
||||||
|
clientEventTestData[2]+","+
|
||||||
|
clientEventTestData[3]+","+
|
||||||
|
clientEventTestData[4]+","+
|
||||||
|
clientEventTestData[8]+`]
|
||||||
|
},
|
||||||
|
"timeline": {
|
||||||
|
"limited": false,
|
||||||
|
"prev_batch": "",
|
||||||
|
"events": [`+
|
||||||
|
clientEventTestData[9]+`]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"leave": {}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost"
|
// $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost"
|
||||||
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[10]})); err != nil {
|
if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[10]})); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
|
@ -64,7 +64,7 @@ const selectMaxIDSQL = "" +
|
||||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||||
const selectStateInRangeSQL = "" +
|
const selectStateInRangeSQL = "" +
|
||||||
"SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" +
|
"SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" +
|
||||||
" WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||||
" ORDER BY id ASC"
|
" ORDER BY id ASC"
|
||||||
|
|
||||||
type outputRoomEventsStatements struct {
|
type outputRoomEventsStatements struct {
|
||||||
|
@ -102,7 +102,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateBetween returns the state events between the two given stream positions, exclusive of both.
|
// StateBetween returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos.
|
||||||
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
|
||||||
// two positions, only the most recent state is returned.
|
// two positions, only the most recent state is returned.
|
||||||
func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) {
|
func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) {
|
||||||
|
|
|
@ -16,8 +16,10 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/events"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -124,6 +126,41 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
|
// - Get membership list changes for this user in this sync response
|
||||||
|
// - For each room which has membership list changes:
|
||||||
|
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
||||||
|
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
||||||
|
// * TODO Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||||
|
// * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block.
|
||||||
|
|
||||||
|
// work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed.
|
||||||
|
for roomID, stateEvents := range state {
|
||||||
|
for _, ev := range stateEvents {
|
||||||
|
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
||||||
|
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
||||||
|
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
||||||
|
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
||||||
|
// the timeline.
|
||||||
|
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
||||||
|
var memberContent events.MemberContent
|
||||||
|
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if memberContent.Membership != "join" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
allState, err := d.roomstate.CurrentState(txn, roomID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
state[roomID] = allState
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, roomID := range roomIDs {
|
for _, roomID := range roomIDs {
|
||||||
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
|
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -64,7 +64,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
|
|
||||||
// Fork off 2 goroutines: one to do the work, and one to serve as a timeout.
|
// Fork off 2 goroutines: one to do the work, and one to serve as a timeout.
|
||||||
// Whichever returns first is the one we will serve back to the client.
|
// Whichever returns first is the one we will serve back to the client.
|
||||||
// TODO: Currently this means that cpu work is timed, which may not be what we want long term.
|
|
||||||
timeoutChan := make(chan struct{})
|
timeoutChan := make(chan struct{})
|
||||||
timer := time.AfterFunc(syncReq.timeout, func() {
|
timer := time.AfterFunc(syncReq.timeout, func() {
|
||||||
close(timeoutChan) // signal that the timeout has expired
|
close(timeoutChan) // signal that the timeout has expired
|
||||||
|
@ -72,8 +71,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
|
|
||||||
done := make(chan util.JSONResponse)
|
done := make(chan util.JSONResponse)
|
||||||
go func() {
|
go func() {
|
||||||
syncData, err := rp.currentSyncForUser(*syncReq)
|
currentPos := rp.notifier.WaitForEvents(*syncReq)
|
||||||
|
// We stop the timer BEFORE calculating the response so the cpu work
|
||||||
|
// done to calculate the response is not timed. This stops us from
|
||||||
|
// doing lots of work then timing out and sending back an empty response.
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
|
syncData, err := rp.currentSyncForUser(*syncReq, currentPos)
|
||||||
var res util.JSONResponse
|
var res util.JSONResponse
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res = httputil.LogThenError(req, err)
|
res = httputil.LogThenError(req, err)
|
||||||
|
@ -98,9 +101,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) {
|
||||||
currentPos := rp.notifier.WaitForEvents(req)
|
|
||||||
|
|
||||||
if req.since == types.StreamPosition(0) {
|
if req.since == types.StreamPosition(0) {
|
||||||
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
|
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue