Remove intermediary /sync response struct (#102)
The logic required to populate the right bits of `RoomData` tends towards the complete `/sync` response struct, so just use the actual response struct and save the hassle of mapping between the two. It may not make much difference in its current form, but the next PR will make use of this. This PR has no functional changes.main
parent
cc7117392f
commit
94e1c62745
|
@ -113,8 +113,7 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
||||||
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) {
|
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
||||||
data = make(map[string]types.RoomData)
|
|
||||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -126,6 +125,8 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res = types.NewResponse(toPos)
|
||||||
|
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
// - Get membership list changes for this user in this sync response
|
// - Get membership list changes for this user in this sync response
|
||||||
// - For each room which has membership list changes:
|
// - For each room which has membership list changes:
|
||||||
|
@ -167,35 +168,40 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
state[roomID] = removeDuplicates(state[roomID], recentEvents)
|
state[roomID] = removeDuplicates(state[roomID], recentEvents)
|
||||||
roomData := types.RoomData{
|
|
||||||
State: state[roomID],
|
jr := types.NewJoinResponse()
|
||||||
RecentEvents: recentEvents,
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
}
|
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
data[roomID] = roomData
|
jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Join[roomID] = *jr
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompleteSync returns all the data needed in order to create a complete sync response.
|
// CompleteSync a complete /sync API response for the given user.
|
||||||
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
|
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
||||||
data = make(map[string]types.RoomData)
|
|
||||||
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
||||||
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
||||||
|
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
|
||||||
|
// but it's better to not hide the fact that this is being done in a transaction.
|
||||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
// Get the current stream position which we will base the sync response on.
|
// Get the current stream position which we will base the sync response on.
|
||||||
id, err := d.events.MaxID(txn)
|
id, err := d.events.MaxID(txn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
pos = types.StreamPosition(id)
|
pos := types.StreamPosition(id)
|
||||||
|
|
||||||
// Extract room state and recent events for all rooms the user is joined to.
|
// Extract room state and recent events for all rooms the user is joined to.
|
||||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build up a /sync response
|
||||||
|
res = types.NewResponse(pos)
|
||||||
for _, roomID := range roomIDs {
|
for _, roomID := range roomIDs {
|
||||||
stateEvents, err := d.roomstate.CurrentState(txn, roomID)
|
stateEvents, err := d.roomstate.CurrentState(txn, roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -209,12 +215,13 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom
|
||||||
}
|
}
|
||||||
|
|
||||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||||
|
jr := types.NewJoinResponse()
|
||||||
data[roomID] = types.RoomData{
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
State: stateEvents,
|
jr.Timeline.Limited = true
|
||||||
RecentEvents: recentEvents,
|
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
||||||
}
|
res.Rooms.Join[roomID] = *jr
|
||||||
}
|
}
|
||||||
|
// TODO: Add invites!
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -102,36 +101,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) {
|
||||||
if req.since == types.StreamPosition(0) {
|
|
||||||
pos, data, err := rp.db.CompleteSync(req.userID, req.limit)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
res := types.NewResponse(pos)
|
|
||||||
for roomID, d := range data {
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = true
|
|
||||||
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Join[roomID] = *jr
|
|
||||||
}
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
|
if req.since == types.StreamPosition(0) {
|
||||||
data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
return rp.db.CompleteSync(req.userID, req.limit)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit)
|
||||||
res := types.NewResponse(currentPos)
|
|
||||||
for roomID, d := range data {
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
||||||
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Join[roomID] = *jr
|
|
||||||
}
|
|
||||||
return res, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,12 +28,6 @@ func (sp StreamPosition) String() string {
|
||||||
return strconv.FormatInt(int64(sp), 10)
|
return strconv.FormatInt(int64(sp), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RoomData represents the data for a room suitable for building a sync response from.
|
|
||||||
type RoomData struct {
|
|
||||||
State []gomatrixserverlib.Event
|
|
||||||
RecentEvents []gomatrixserverlib.Event
|
|
||||||
}
|
|
||||||
|
|
||||||
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||||
type Response struct {
|
type Response struct {
|
||||||
NextBatch string `json:"next_batch"`
|
NextBatch string `json:"next_batch"`
|
||||||
|
|
Loading…
Reference in New Issue