Calculate the complete /sync response (#69)
parent
203e706b99
commit
53ec4a255b
|
@ -29,6 +29,8 @@ CREATE TABLE IF NOT EXISTS current_room_state (
|
||||||
);
|
);
|
||||||
-- for event deletion
|
-- for event deletion
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON current_room_state(event_id);
|
CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON current_room_state(event_id);
|
||||||
|
-- for querying membership states of users
|
||||||
|
CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
||||||
`
|
`
|
||||||
|
|
||||||
const upsertRoomStateSQL = "" +
|
const upsertRoomStateSQL = "" +
|
||||||
|
@ -39,9 +41,17 @@ const upsertRoomStateSQL = "" +
|
||||||
const deleteRoomStateByEventIDSQL = "" +
|
const deleteRoomStateByEventIDSQL = "" +
|
||||||
"DELETE FROM current_room_state WHERE event_id = $1"
|
"DELETE FROM current_room_state WHERE event_id = $1"
|
||||||
|
|
||||||
|
const selectRoomIDsWithMembershipSQL = "" +
|
||||||
|
"SELECT room_id FROM current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
||||||
|
|
||||||
|
const selectCurrentStateSQL = "" +
|
||||||
|
"SELECT event_json FROM current_room_state WHERE room_id = $1"
|
||||||
|
|
||||||
type currentRoomStateStatements struct {
|
type currentRoomStateStatements struct {
|
||||||
upsertRoomStateStmt *sql.Stmt
|
upsertRoomStateStmt *sql.Stmt
|
||||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||||
|
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||||
|
selectCurrentStateStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
@ -55,9 +65,58 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
|
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||||
|
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(txn *sql.Tx, userID, membership string) ([]string, error) {
|
||||||
|
rows, err := txn.Stmt(s.selectRoomIDsWithMembershipStmt).Query(userID, membership)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var result []string
|
||||||
|
for rows.Next() {
|
||||||
|
var roomID string
|
||||||
|
if err := rows.Scan(&roomID); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, roomID)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CurrentState returns all the current state events for the given room.
|
||||||
|
func (s *currentRoomStateStatements) CurrentState(txn *sql.Tx, roomID string) ([]gomatrixserverlib.Event, error) {
|
||||||
|
rows, err := txn.Stmt(s.selectCurrentStateStmt).Query(roomID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var result []gomatrixserverlib.Event
|
||||||
|
for rows.Next() {
|
||||||
|
var eventBytes []byte
|
||||||
|
if err := rows.Scan(&eventBytes); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// TODO: Handle redacted events
|
||||||
|
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result = append(result, ev)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *currentRoomStateStatements) UpdateRoomState(txn *sql.Tx, added []gomatrixserverlib.Event, removedEventIDs []string) error {
|
func (s *currentRoomStateStatements) UpdateRoomState(txn *sql.Tx, added []gomatrixserverlib.Event, removedEventIDs []string) error {
|
||||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||||
for _, eventID := range removedEventIDs {
|
for _, eventID := range removedEventIDs {
|
||||||
|
|
|
@ -39,6 +39,9 @@ const selectEventsSQL = "" +
|
||||||
const selectEventsInRangeSQL = "" +
|
const selectEventsInRangeSQL = "" +
|
||||||
"SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2"
|
"SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2"
|
||||||
|
|
||||||
|
const selectRecentEventsSQL = "" +
|
||||||
|
"SELECT event_json FROM output_room_events WHERE room_id = $1 ORDER BY id DESC LIMIT $2"
|
||||||
|
|
||||||
const selectMaxIDSQL = "" +
|
const selectMaxIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM output_room_events"
|
"SELECT MAX(id) FROM output_room_events"
|
||||||
|
|
||||||
|
@ -47,6 +50,7 @@ type outputRoomEventsStatements struct {
|
||||||
selectEventsStmt *sql.Stmt
|
selectEventsStmt *sql.Stmt
|
||||||
selectMaxIDStmt *sql.Stmt
|
selectMaxIDStmt *sql.Stmt
|
||||||
selectEventsInRangeStmt *sql.Stmt
|
selectEventsInRangeStmt *sql.Stmt
|
||||||
|
selectRecentEventsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
@ -66,14 +70,22 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectEventsInRangeStmt, err = db.Prepare(selectEventsInRangeSQL); err != nil {
|
if s.selectEventsInRangeStmt, err = db.Prepare(selectEventsInRangeSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxID returns the ID of the last inserted event in this table. This should only ever be used at startup, as it will
|
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
|
||||||
// race with inserting events if it is done afterwards. If there are no inserted events, 0 is returned.
|
// then this function should only ever be used at startup, as it will race with inserting events if it is
|
||||||
func (s *outputRoomEventsStatements) MaxID() (id int64, err error) {
|
// done afterwards. If there are no inserted events, 0 is returned.
|
||||||
|
func (s *outputRoomEventsStatements) MaxID(txn *sql.Tx) (id int64, err error) {
|
||||||
|
stmt := s.selectMaxIDStmt
|
||||||
|
if txn != nil {
|
||||||
|
stmt = txn.Stmt(stmt)
|
||||||
|
}
|
||||||
var nullableID sql.NullInt64
|
var nullableID sql.NullInt64
|
||||||
err = s.selectMaxIDStmt.QueryRow().Scan(&nullableID)
|
err = stmt.QueryRow().Scan(&nullableID)
|
||||||
if nullableID.Valid {
|
if nullableID.Valid {
|
||||||
id = nullableID.Int64
|
id = nullableID.Int64
|
||||||
}
|
}
|
||||||
|
@ -89,23 +101,14 @@ func (s *outputRoomEventsStatements) InRange(oldPos, newPos int64) ([]gomatrixse
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
var result []gomatrixserverlib.Event
|
result, err := rowsToEvents(rows)
|
||||||
var i int64
|
if err != nil {
|
||||||
for ; rows.Next(); i++ {
|
return nil, err
|
||||||
var eventBytes []byte
|
|
||||||
if err := rows.Scan(&eventBytes); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
result = append(result, ev)
|
|
||||||
}
|
}
|
||||||
// Expect one event per position, exclusive of old. eg old=3, new=5, expect 4,5 so 2 events.
|
// Expect one event per position, exclusive of old. eg old=3, new=5, expect 4,5 so 2 events.
|
||||||
wantNum := (newPos - oldPos)
|
wantNum := int(newPos - oldPos)
|
||||||
if i != wantNum {
|
if len(result) != wantNum {
|
||||||
return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", i, wantNum)
|
return nil, fmt.Errorf("failed to map all positions to events: (got %d, wanted, %d)", len(result), wantNum)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
@ -119,6 +122,16 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
|
||||||
|
func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, limit int) ([]gomatrixserverlib.Event, error) {
|
||||||
|
rows, err := s.selectRecentEventsStmt.Query(roomID, limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
return rowsToEvents(rows)
|
||||||
|
}
|
||||||
|
|
||||||
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing
|
||||||
// from the database.
|
// from the database.
|
||||||
func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) {
|
func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) {
|
||||||
|
@ -127,22 +140,30 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
result, err := rowsToEvents(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(result) != len(eventIDs) {
|
||||||
|
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", len(result), len(eventIDs))
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
||||||
var result []gomatrixserverlib.Event
|
var result []gomatrixserverlib.Event
|
||||||
i := 0
|
for rows.Next() {
|
||||||
for ; rows.Next(); i++ {
|
|
||||||
var eventBytes []byte
|
var eventBytes []byte
|
||||||
if err := rows.Scan(&eventBytes); err != nil {
|
if err := rows.Scan(&eventBytes); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// TODO: Handle redacted events
|
||||||
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
result = append(result, ev)
|
result = append(result, ev)
|
||||||
}
|
}
|
||||||
if i != len(eventIDs) {
|
|
||||||
return nil, fmt.Errorf("failed to map all event IDs to events: (got %d, wanted %d)", i, len(eventIDs))
|
|
||||||
}
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,13 +89,52 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o
|
||||||
|
|
||||||
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
||||||
func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) {
|
func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) {
|
||||||
id, err := d.events.MaxID()
|
id, err := d.events.MaxID(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.StreamPosition(0), err
|
return types.StreamPosition(0), err
|
||||||
}
|
}
|
||||||
return types.StreamPosition(id), nil
|
return types.StreamPosition(id), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CompleteSync returns all the data needed in order to create a complete sync response.
|
||||||
|
func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) {
|
||||||
|
data = make(map[string]types.RoomData)
|
||||||
|
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
|
||||||
|
// a consistent view of the database throughout. This includes extracting the sync stream position.
|
||||||
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
// Get the current stream position which we will base the sync response on.
|
||||||
|
id, err := d.events.MaxID(txn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pos = types.StreamPosition(id)
|
||||||
|
|
||||||
|
// Extract room state and recent events for all rooms the user is joined to.
|
||||||
|
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, roomID := range roomIDs {
|
||||||
|
stateEvents, err := d.roomstate.CurrentState(txn, roomID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
||||||
|
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
||||||
|
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, numRecentEventsPerRoom)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
data[roomID] = types.RoomData{
|
||||||
|
State: stateEvents,
|
||||||
|
RecentEvents: recentEvents,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos.
|
// EventsInRange returns all events in the given range, exclusive of oldPos, inclusive of newPos.
|
||||||
func (d *SyncServerDatabase) EventsInRange(oldPos, newPos types.StreamPosition) ([]gomatrixserverlib.Event, error) {
|
func (d *SyncServerDatabase) EventsInRange(oldPos, newPos types.StreamPosition) ([]gomatrixserverlib.Event, error) {
|
||||||
return d.events.InRange(int64(oldPos), int64(newPos))
|
return d.events.InRange(int64(oldPos), int64(newPos))
|
||||||
|
|
|
@ -139,6 +139,23 @@ func (rp *RequestPool) waitForEvents(req syncRequest) types.StreamPosition {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) {
|
||||||
|
if req.since == types.StreamPosition(0) {
|
||||||
|
pos, data, err := rp.db.CompleteSync(req.userID, 3)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res := types.NewResponse()
|
||||||
|
res.NextBatch = pos.String()
|
||||||
|
for roomID, d := range data {
|
||||||
|
jr := types.NewJoinResponse()
|
||||||
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
jr.Timeline.Limited = true
|
||||||
|
jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Join[roomID] = *jr
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
currentPos := rp.waitForEvents(req)
|
currentPos := rp.waitForEvents(req)
|
||||||
|
|
||||||
// TODO: handle ignored users
|
// TODO: handle ignored users
|
||||||
|
@ -153,16 +170,16 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, err
|
||||||
// c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch?
|
// c) Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // Synapse has a TODO: How do we handle ban -> leave in same batch?
|
||||||
// 4) Add joined rooms (joined room list)
|
// 4) Add joined rooms (joined room list)
|
||||||
|
|
||||||
events, err := rp.db.EventsInRange(req.since, currentPos)
|
evs, err := rp.db.EventsInRange(req.since, currentPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res := types.NewResponse()
|
res := types.NewResponse()
|
||||||
// for now, dump everything as join timeline events
|
// for now, dump everything as join timeline events
|
||||||
for _, ev := range events {
|
for _, ev := range evs {
|
||||||
roomData := res.Rooms.Join[ev.RoomID()]
|
roomData := res.Rooms.Join[ev.RoomID()]
|
||||||
roomData.Timeline.Events = append(roomData.Timeline.Events, ev)
|
roomData.Timeline.Events = append(roomData.Timeline.Events, gomatrixserverlib.ToClientEvent(ev, gomatrixserverlib.FormatSync))
|
||||||
res.Rooms.Join[ev.RoomID()] = roomData
|
res.Rooms.Join[ev.RoomID()] = roomData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
// StreamPosition represents the offset in the sync stream a client is at.
|
// StreamPosition represents the offset in the sync stream a client is at.
|
||||||
|
@ -14,14 +13,20 @@ func (sp StreamPosition) String() string {
|
||||||
return strconv.FormatInt(int64(sp), 10)
|
return strconv.FormatInt(int64(sp), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RoomData represents the data for a room suitable for building a sync response from.
|
||||||
|
type RoomData struct {
|
||||||
|
State []gomatrixserverlib.Event
|
||||||
|
RecentEvents []gomatrixserverlib.Event
|
||||||
|
}
|
||||||
|
|
||||||
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||||
type Response struct {
|
type Response struct {
|
||||||
NextBatch string `json:"next_batch"`
|
NextBatch string `json:"next_batch"`
|
||||||
AccountData struct {
|
AccountData struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
} `json:"account_data"`
|
} `json:"account_data"`
|
||||||
Presence struct {
|
Presence struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
} `json:"presence"`
|
} `json:"presence"`
|
||||||
Rooms struct {
|
Rooms struct {
|
||||||
Join map[string]JoinResponse `json:"join"`
|
Join map[string]JoinResponse `json:"join"`
|
||||||
|
@ -43,8 +48,8 @@ func NewResponse() *Response {
|
||||||
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
|
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
|
||||||
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
|
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
|
||||||
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
|
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
|
||||||
res.AccountData.Events = make([]gomatrixserverlib.Event, 0)
|
res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
res.Presence.Events = make([]gomatrixserverlib.Event, 0)
|
res.Presence.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
|
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
@ -52,61 +57,61 @@ func NewResponse() *Response {
|
||||||
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
||||||
type JoinResponse struct {
|
type JoinResponse struct {
|
||||||
State struct {
|
State struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
} `json:"state"`
|
} `json:"state"`
|
||||||
Timeline struct {
|
Timeline struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
Limited bool `json:"limited"`
|
Limited bool `json:"limited"`
|
||||||
PrevBatch string `json:"prev_batch"`
|
PrevBatch string `json:"prev_batch"`
|
||||||
} `json:"timeline"`
|
} `json:"timeline"`
|
||||||
Ephemeral struct {
|
Ephemeral struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
} `json:"ephemeral"`
|
} `json:"ephemeral"`
|
||||||
AccountData struct {
|
AccountData struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
} `json:"account_data"`
|
} `json:"account_data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJoinResponse creates an empty response with initialised arrays.
|
// NewJoinResponse creates an empty response with initialised arrays.
|
||||||
func NewJoinResponse() *JoinResponse {
|
func NewJoinResponse() *JoinResponse {
|
||||||
res := JoinResponse{}
|
res := JoinResponse{}
|
||||||
res.State.Events = make([]gomatrixserverlib.Event, 0)
|
res.State.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
res.Timeline.Events = make([]gomatrixserverlib.Event, 0)
|
res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
res.Ephemeral.Events = make([]gomatrixserverlib.Event, 0)
|
res.Ephemeral.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
res.AccountData.Events = make([]gomatrixserverlib.Event, 0)
|
res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
||||||
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
||||||
type InviteResponse struct {
|
type InviteResponse struct {
|
||||||
InviteState struct {
|
InviteState struct {
|
||||||
Events []gomatrixserverlib.Event
|
Events []gomatrixserverlib.ClientEvent
|
||||||
} `json:"invite_state"`
|
} `json:"invite_state"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInviteResponse creates an empty response with initialised arrays.
|
// NewInviteResponse creates an empty response with initialised arrays.
|
||||||
func NewInviteResponse() *InviteResponse {
|
func NewInviteResponse() *InviteResponse {
|
||||||
res := InviteResponse{}
|
res := InviteResponse{}
|
||||||
res.InviteState.Events = make([]gomatrixserverlib.Event, 0)
|
res.InviteState.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
|
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
|
||||||
type LeaveResponse struct {
|
type LeaveResponse struct {
|
||||||
State struct {
|
State struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
} `json:"state"`
|
} `json:"state"`
|
||||||
Timeline struct {
|
Timeline struct {
|
||||||
Events []gomatrixserverlib.Event `json:"events"`
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
||||||
Limited bool `json:"limited"`
|
Limited bool `json:"limited"`
|
||||||
PrevBatch string `json:"prev_batch"`
|
PrevBatch string `json:"prev_batch"`
|
||||||
} `json:"timeline"`
|
} `json:"timeline"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLeaveResponse creates an empty response with initialised arrays.
|
// NewLeaveResponse creates an empty response with initialised arrays.
|
||||||
func NewLeaveResponse() *LeaveResponse {
|
func NewLeaveResponse() *LeaveResponse {
|
||||||
res := LeaveResponse{}
|
res := LeaveResponse{}
|
||||||
res.State.Events = make([]gomatrixserverlib.Event, 0)
|
res.State.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
res.Timeline.Events = make([]gomatrixserverlib.Event, 0)
|
res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@
|
||||||
{
|
{
|
||||||
"importpath": "github.com/matrix-org/gomatrixserverlib",
|
"importpath": "github.com/matrix-org/gomatrixserverlib",
|
||||||
"repository": "https://github.com/matrix-org/gomatrixserverlib",
|
"repository": "https://github.com/matrix-org/gomatrixserverlib",
|
||||||
"revision": "e757e4f7f675c7356d4b2953059718f1b4598a08",
|
"revision": "8e847d5bdb5cc0dd11e0846188eb403b70ae6bb3",
|
||||||
"branch": "master"
|
"branch": "master"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
/* Copyright 2017 Vector Creations Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package gomatrixserverlib
|
||||||
|
|
||||||
|
// EventFormat specifies the format of a client event
|
||||||
|
type EventFormat int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// FormatAll will include all client event keys
|
||||||
|
FormatAll EventFormat = iota
|
||||||
|
// FormatSync will include only the event keys required by the /sync API. Notably, this
|
||||||
|
// means the 'room_id' will be missing from the events.
|
||||||
|
FormatSync
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClientEvent is an event which is fit for consumption by clients, in accordance with the specification.
|
||||||
|
type ClientEvent struct {
|
||||||
|
Content rawJSON `json:"content"`
|
||||||
|
EventID string `json:"event_id"`
|
||||||
|
OriginServerTS int64 `json:"origin_server_ts"`
|
||||||
|
// RoomID is omitted on /sync responses
|
||||||
|
RoomID string `json:"room_id,omitempty"`
|
||||||
|
Sender string `json:"sender"`
|
||||||
|
StateKey *string `json:"state_key,omitempty"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Unsigned rawJSON `json:"unsigned,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToClientEvents converts server events to client events.
|
||||||
|
func ToClientEvents(serverEvs []Event, format EventFormat) []ClientEvent {
|
||||||
|
evs := make([]ClientEvent, len(serverEvs))
|
||||||
|
for i, se := range serverEvs {
|
||||||
|
evs[i] = ToClientEvent(se, format)
|
||||||
|
}
|
||||||
|
return evs
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToClientEvent converts a single server event to a client event.
|
||||||
|
func ToClientEvent(se Event, format EventFormat) ClientEvent {
|
||||||
|
ce := ClientEvent{
|
||||||
|
Content: rawJSON(se.Content()),
|
||||||
|
Sender: se.Sender(),
|
||||||
|
Type: se.Type(),
|
||||||
|
StateKey: se.StateKey(),
|
||||||
|
Unsigned: rawJSON(se.Unsigned()),
|
||||||
|
OriginServerTS: se.OriginServerTS(),
|
||||||
|
EventID: se.EventID(),
|
||||||
|
}
|
||||||
|
if format == FormatAll {
|
||||||
|
ce.RoomID = se.RoomID()
|
||||||
|
}
|
||||||
|
return ce
|
||||||
|
}
|
|
@ -0,0 +1,103 @@
|
||||||
|
/* Copyright 2017 Vector Creations Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package gomatrixserverlib
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestToClientEvent(t *testing.T) {
|
||||||
|
ev, err := NewEventFromTrustedJSON([]byte(`{
|
||||||
|
"type": "m.room.name",
|
||||||
|
"state_key": "",
|
||||||
|
"event_id": "$test:localhost",
|
||||||
|
"room_id": "!test:localhost",
|
||||||
|
"sender": "@test:localhost",
|
||||||
|
"content": {
|
||||||
|
"name": "Hello World"
|
||||||
|
},
|
||||||
|
"origin_server_ts": 123456,
|
||||||
|
"unsigned": {
|
||||||
|
"prev_content": {
|
||||||
|
"name": "Goodbye World"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`), false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create Event: %s", err)
|
||||||
|
}
|
||||||
|
ce := ToClientEvent(ev, FormatAll)
|
||||||
|
if ce.EventID != ev.EventID() {
|
||||||
|
t.Errorf("ClientEvent.EventID: wanted %s, got %s", ev.EventID(), ce.EventID)
|
||||||
|
}
|
||||||
|
if ce.OriginServerTS != ev.OriginServerTS() {
|
||||||
|
t.Errorf("ClientEvent.OriginServerTS: wanted %d, got %d", ev.OriginServerTS(), ce.OriginServerTS)
|
||||||
|
}
|
||||||
|
if ce.StateKey == nil || *ce.StateKey != "" {
|
||||||
|
t.Errorf("ClientEvent.StateKey: wanted '', got %v", ce.StateKey)
|
||||||
|
}
|
||||||
|
if ce.Type != ev.Type() {
|
||||||
|
t.Errorf("ClientEvent.Type: wanted %s, got %s", ev.Type(), ce.Type)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(ce.Content, ev.Content()) {
|
||||||
|
t.Errorf("ClientEvent.Content: wanted %s, got %s", string(ev.Content()), string(ce.Content))
|
||||||
|
}
|
||||||
|
if !bytes.Equal(ce.Unsigned, ev.Unsigned()) {
|
||||||
|
t.Errorf("ClientEvent.Unsigned: wanted %s, got %s", string(ev.Unsigned()), string(ce.Unsigned))
|
||||||
|
}
|
||||||
|
if ce.Sender != ev.Sender() {
|
||||||
|
t.Errorf("ClientEvent.Sender: wanted %s, got %s", ev.Sender(), ce.Sender)
|
||||||
|
}
|
||||||
|
j, err := json.Marshal(ce)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to Marshal ClientEvent: %s", err)
|
||||||
|
}
|
||||||
|
// Marshal sorts keys in structs by the order they are defined in the struct, which is alphabetical
|
||||||
|
out := `{"content":{"name":"Hello World"},"event_id":"$test:localhost","origin_server_ts":123456,` +
|
||||||
|
`"room_id":"!test:localhost","sender":"@test:localhost","state_key":"","type":"m.room.name",` +
|
||||||
|
`"unsigned":{"prev_content":{"name":"Goodbye World"}}}`
|
||||||
|
if !bytes.Equal([]byte(out), j) {
|
||||||
|
t.Errorf("ClientEvent marshalled to wrong bytes: wanted %s, got %s", out, string(j))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToClientFormatSync(t *testing.T) {
|
||||||
|
ev, err := NewEventFromTrustedJSON([]byte(`{
|
||||||
|
"type": "m.room.name",
|
||||||
|
"state_key": "",
|
||||||
|
"event_id": "$test:localhost",
|
||||||
|
"room_id": "!test:localhost",
|
||||||
|
"sender": "@test:localhost",
|
||||||
|
"content": {
|
||||||
|
"name": "Hello World"
|
||||||
|
},
|
||||||
|
"origin_server_ts": 123456,
|
||||||
|
"unsigned": {
|
||||||
|
"prev_content": {
|
||||||
|
"name": "Goodbye World"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`), false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create Event: %s", err)
|
||||||
|
}
|
||||||
|
ce := ToClientEvent(ev, FormatSync)
|
||||||
|
if ce.RoomID != "" {
|
||||||
|
t.Errorf("ClientEvent.RoomID: wanted '', got %s", ce.RoomID)
|
||||||
|
}
|
||||||
|
}
|
|
@ -88,16 +88,18 @@ type Event struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventFields struct {
|
type eventFields struct {
|
||||||
RoomID string `json:"room_id"`
|
RoomID string `json:"room_id"`
|
||||||
EventID string `json:"event_id"`
|
EventID string `json:"event_id"`
|
||||||
Sender string `json:"sender"`
|
Sender string `json:"sender"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
StateKey *string `json:"state_key"`
|
StateKey *string `json:"state_key"`
|
||||||
Content rawJSON `json:"content"`
|
Content rawJSON `json:"content"`
|
||||||
PrevEvents []EventReference `json:"prev_events"`
|
PrevEvents []EventReference `json:"prev_events"`
|
||||||
AuthEvents []EventReference `json:"auth_events"`
|
AuthEvents []EventReference `json:"auth_events"`
|
||||||
Redacts string `json:"redacts"`
|
Redacts string `json:"redacts"`
|
||||||
Depth int64 `json:"depth"`
|
Depth int64 `json:"depth"`
|
||||||
|
Unsigned rawJSON `json:"unsigned"`
|
||||||
|
OriginServerTS int64 `json:"origin_server_ts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var emptyEventReferenceList = []EventReference{}
|
var emptyEventReferenceList = []EventReference{}
|
||||||
|
@ -308,6 +310,16 @@ func (e Event) Type() string {
|
||||||
return e.fields.Type
|
return e.fields.Type
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OriginServerTS returns the unix timestamp when this event was created on the origin server, with millisecond resolution.
|
||||||
|
func (e Event) OriginServerTS() int64 {
|
||||||
|
return e.fields.OriginServerTS
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsigned returns the object under the 'unsigned' key of the event.
|
||||||
|
func (e Event) Unsigned() []byte {
|
||||||
|
return []byte(e.fields.Unsigned)
|
||||||
|
}
|
||||||
|
|
||||||
// Content returns the content JSON of the event.
|
// Content returns the content JSON of the event.
|
||||||
func (e Event) Content() []byte {
|
func (e Event) Content() []byte {
|
||||||
return []byte(e.fields.Content)
|
return []byte(e.fields.Content)
|
||||||
|
|
Loading…
Reference in New Issue