Add /sync API response structs (#58)
parent
9a266762b4
commit
da0fffcc19
|
@ -4,6 +4,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/sync/syncapi"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
@ -41,13 +42,14 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
||||||
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
||||||
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
// when generating the stream position for this event. Returns the sync stream position for the inserted event.
|
||||||
// Returns an error if there was a problem inserting this event.
|
// Returns an error if there was a problem inserting this event.
|
||||||
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos int64, returnErr error) {
|
func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) (streamPos syncapi.StreamPosition, returnErr error) {
|
||||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
var err error
|
var err error
|
||||||
streamPos, err = d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
|
pos, err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
streamPos = syncapi.StreamPosition(pos)
|
||||||
|
|
||||||
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 {
|
if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 {
|
||||||
// Nothing to do, the event may have just been a message event.
|
// Nothing to do, the event may have just been a message event.
|
||||||
|
@ -86,13 +88,17 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
||||||
func (d *SyncServerDatabase) SyncStreamPosition() (int64, error) {
|
func (d *SyncServerDatabase) SyncStreamPosition() (syncapi.StreamPosition, error) {
|
||||||
return d.events.MaxID()
|
id, err := d.events.MaxID()
|
||||||
|
if err != nil {
|
||||||
|
return syncapi.StreamPosition(0), err
|
||||||
|
}
|
||||||
|
return syncapi.StreamPosition(id), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos.
|
// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos.
|
||||||
func (d *SyncServerDatabase) EventsInRange(oldPos, newPos int64) ([]gomatrixserverlib.Event, error) {
|
func (d *SyncServerDatabase) EventsInRange(oldPos, newPos syncapi.StreamPosition) ([]gomatrixserverlib.Event, error) {
|
||||||
return d.events.InRange(oldPos, newPos)
|
return d.events.InRange(int64(oldPos), int64(newPos))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
"github.com/matrix-org/dendrite/clientapi/storage"
|
"github.com/matrix-org/dendrite/clientapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/sync/syncapi"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
@ -20,7 +21,7 @@ const defaultSyncTimeout = time.Duration(30) * time.Second
|
||||||
type syncRequest struct {
|
type syncRequest struct {
|
||||||
userID string
|
userID string
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
since syncStreamPosition
|
since syncapi.StreamPosition
|
||||||
wantFullState bool
|
wantFullState bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +29,7 @@ type syncRequest struct {
|
||||||
type RequestPool struct {
|
type RequestPool struct {
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
// The latest sync stream position: guarded by 'cond'.
|
// The latest sync stream position: guarded by 'cond'.
|
||||||
currPos syncStreamPosition
|
currPos syncapi.StreamPosition
|
||||||
// A condition variable to notify all waiting goroutines of a new sync stream position
|
// A condition variable to notify all waiting goroutines of a new sync stream position
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
}
|
}
|
||||||
|
@ -39,7 +40,7 @@ func NewRequestPool(db *storage.SyncServerDatabase) (*RequestPool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &RequestPool{db, syncStreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil
|
return &RequestPool{db, syncapi.StreamPosition(pos), sync.NewCond(&sync.Mutex{})}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||||
|
@ -114,7 +115,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons
|
||||||
// OnNewEvent is called when a new event is received from the room server. Must only be
|
// OnNewEvent is called when a new event is received from the room server. Must only be
|
||||||
// called from a single goroutine, to avoid races between updates which could set the
|
// called from a single goroutine, to avoid races between updates which could set the
|
||||||
// current position in the stream incorrectly.
|
// current position in the stream incorrectly.
|
||||||
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPosition) {
|
func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncapi.StreamPosition) {
|
||||||
// update the current position in a guard and then notify all /sync streams
|
// update the current position in a guard and then notify all /sync streams
|
||||||
rp.cond.L.Lock()
|
rp.cond.L.Lock()
|
||||||
rp.currPos = pos
|
rp.currPos = pos
|
||||||
|
@ -123,7 +124,7 @@ func (rp *RequestPool) OnNewEvent(ev *gomatrixserverlib.Event, pos syncStreamPos
|
||||||
rp.cond.Broadcast() // notify ALL waiting goroutines
|
rp.cond.Broadcast() // notify ALL waiting goroutines
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition {
|
func (rp *RequestPool) waitForEvents(req syncRequest) syncapi.StreamPosition {
|
||||||
// In a guard, check if the /sync request should block, and block it until we get a new position
|
// In a guard, check if the /sync request should block, and block it until we get a new position
|
||||||
rp.cond.L.Lock()
|
rp.cond.L.Lock()
|
||||||
currentPos := rp.currPos
|
currentPos := rp.currPos
|
||||||
|
@ -137,9 +138,38 @@ func (rp *RequestPool) waitForEvents(req syncRequest) syncStreamPosition {
|
||||||
return currentPos
|
return currentPos
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) ([]gomatrixserverlib.Event, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*syncapi.Response, error) {
|
||||||
currentPos := rp.waitForEvents(req)
|
currentPos := rp.waitForEvents(req)
|
||||||
return rp.db.EventsInRange(int64(req.since), int64(currentPos))
|
|
||||||
|
// TODO: handle ignored users
|
||||||
|
|
||||||
|
// TODO: Implement https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
|
// 1) Get the CURRENT joined room list for this user
|
||||||
|
// 2) Get membership list changes for this user between the provided stream position and now.
|
||||||
|
// 3) For each room which has membership list changes:
|
||||||
|
// a) Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins).
|
||||||
|
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
||||||
|
// b) Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||||
|
// c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch?
|
||||||
|
// 4) Add joined rooms (joined room list)
|
||||||
|
|
||||||
|
events, err := rp.db.EventsInRange(req.since, currentPos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res := syncapi.NewResponse()
|
||||||
|
// for now, dump everything as join timeline events
|
||||||
|
for _, ev := range events {
|
||||||
|
roomData := res.Rooms.Join[ev.RoomID()]
|
||||||
|
roomData.Timeline.Events = append(roomData.Timeline.Events, ev)
|
||||||
|
res.Rooms.Join[ev.RoomID()] = roomData
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
|
||||||
|
// as an integer even though (at the moment) it is.
|
||||||
|
res.NextBatch = currentPos.String()
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTimeout(timeoutMS string) time.Duration {
|
func getTimeout(timeoutMS string) time.Duration {
|
||||||
|
@ -153,13 +183,13 @@ func getTimeout(timeoutMS string) time.Duration {
|
||||||
return time.Duration(i) * time.Millisecond
|
return time.Duration(i) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSyncStreamPosition(since string) (syncStreamPosition, error) {
|
func getSyncStreamPosition(since string) (syncapi.StreamPosition, error) {
|
||||||
if since == "" {
|
if since == "" {
|
||||||
return syncStreamPosition(0), nil
|
return syncapi.StreamPosition(0), nil
|
||||||
}
|
}
|
||||||
i, err := strconv.Atoi(since)
|
i, err := strconv.Atoi(since)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return syncStreamPosition(0), err
|
return syncapi.StreamPosition(0), err
|
||||||
}
|
}
|
||||||
return syncStreamPosition(i), nil
|
return syncapi.StreamPosition(i), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,112 @@
|
||||||
|
package syncapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StreamPosition represents the offset in the sync stream a client is at.
|
||||||
|
type StreamPosition int64
|
||||||
|
|
||||||
|
// String implements the Stringer interface.
|
||||||
|
func (sp StreamPosition) String() string {
|
||||||
|
return strconv.FormatInt(int64(sp), 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||||
|
type Response struct {
|
||||||
|
NextBatch string `json:"next_batch"`
|
||||||
|
AccountData struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
} `json:"account_data"`
|
||||||
|
Presence struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
} `json:"presence"`
|
||||||
|
Rooms struct {
|
||||||
|
Join map[string]JoinResponse `json:"join"`
|
||||||
|
Invite map[string]InviteResponse `json:"invite"`
|
||||||
|
Leave map[string]LeaveResponse `json:"leave"`
|
||||||
|
} `json:"rooms"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewResponse creates an empty response with initialised maps.
|
||||||
|
func NewResponse() *Response {
|
||||||
|
res := Response{}
|
||||||
|
// Pre-initalise the maps. Synapse will return {} even if there are no rooms under a specific section,
|
||||||
|
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
|
||||||
|
res.Rooms.Join = make(map[string]JoinResponse)
|
||||||
|
res.Rooms.Invite = make(map[string]InviteResponse)
|
||||||
|
res.Rooms.Leave = make(map[string]LeaveResponse)
|
||||||
|
|
||||||
|
// Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
|
||||||
|
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
|
||||||
|
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
|
||||||
|
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
|
||||||
|
res.AccountData.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
res.Presence.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
|
||||||
|
return &res
|
||||||
|
}
|
||||||
|
|
||||||
|
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
||||||
|
type JoinResponse struct {
|
||||||
|
State struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
} `json:"state"`
|
||||||
|
Timeline struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
Limited bool `json:"limited"`
|
||||||
|
PrevBatch string `json:"prev_batch"`
|
||||||
|
} `json:"timeline"`
|
||||||
|
Ephemeral struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
} `json:"ephemeral"`
|
||||||
|
AccountData struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
} `json:"account_data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJoinResponse creates an empty response with initialised arrays.
|
||||||
|
func NewJoinResponse() *JoinResponse {
|
||||||
|
res := JoinResponse{}
|
||||||
|
res.State.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
res.Timeline.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
res.Ephemeral.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
res.AccountData.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
return &res
|
||||||
|
}
|
||||||
|
|
||||||
|
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
||||||
|
type InviteResponse struct {
|
||||||
|
InviteState struct {
|
||||||
|
Events []gomatrixserverlib.Event
|
||||||
|
} `json:"invite_state"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewInviteResponse creates an empty response with initialised arrays.
|
||||||
|
func NewInviteResponse() *InviteResponse {
|
||||||
|
res := InviteResponse{}
|
||||||
|
res.InviteState.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
return &res
|
||||||
|
}
|
||||||
|
|
||||||
|
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
|
||||||
|
type LeaveResponse struct {
|
||||||
|
State struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
} `json:"state"`
|
||||||
|
Timeline struct {
|
||||||
|
Events []gomatrixserverlib.Event `json:"events"`
|
||||||
|
Limited bool `json:"limited"`
|
||||||
|
PrevBatch string `json:"prev_batch"`
|
||||||
|
} `json:"timeline"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLeaveResponse creates an empty response with initialised arrays.
|
||||||
|
func NewLeaveResponse() *LeaveResponse {
|
||||||
|
res := LeaveResponse{}
|
||||||
|
res.State.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
res.Timeline.Events = make([]gomatrixserverlib.Event, 0)
|
||||||
|
return &res
|
||||||
|
}
|
|
@ -6,15 +6,13 @@ import (
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/clientapi/config"
|
"github.com/matrix-org/dendrite/clientapi/config"
|
||||||
"github.com/matrix-org/dendrite/clientapi/storage"
|
"github.com/matrix-org/dendrite/clientapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/sync/syncapi"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// syncStreamPosition represents the offset in the sync stream a client is at.
|
|
||||||
type syncStreamPosition int64
|
|
||||||
|
|
||||||
// Server contains all the logic for running a sync server
|
// Server contains all the logic for running a sync server
|
||||||
type Server struct {
|
type Server struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
|
@ -83,7 +81,7 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}).Panicf("roomserver output log: write event failure")
|
}).Panicf("roomserver output log: write event failure")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.rp.OnNewEvent(&ev, syncStreamPosition(syncStreamPos))
|
s.rp.OnNewEvent(&ev, syncapi.StreamPosition(syncStreamPos))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue