Implement room state retrieval endpoint (#254)
* Move prev event structure to component-wide types * Renamed key for better understandability * Implement /state endpoint * Change rowsToEvents() to return an empty slice instead of nil in case of empty result set * Doc * Fix forgotten comma * Specify HTTP method * Update comment * Remove debug fmt.Println
This commit is contained in:
parent
b9a933846b
commit
0218063339
6 changed files with 88 additions and 13 deletions
|
@ -38,12 +38,6 @@ type OutputRoomEvent struct {
|
|||
query api.RoomserverQueryAPI
|
||||
}
|
||||
|
||||
type prevEventRef struct {
|
||||
PrevContent json.RawMessage `json:"prev_content"`
|
||||
PrevID string `json:"replaces_state"`
|
||||
UserID string `json:"prev_sender"`
|
||||
}
|
||||
|
||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputRoomEvent(
|
||||
cfg *config.Dendrite,
|
||||
|
@ -267,10 +261,10 @@ func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomat
|
|||
return event, nil
|
||||
}
|
||||
|
||||
prev := prevEventRef{
|
||||
PrevContent: prevEvent.Content(),
|
||||
PrevID: prevEvent.EventID(),
|
||||
UserID: prevEvent.Sender(),
|
||||
prev := types.PrevEventRef{
|
||||
PrevContent: prevEvent.Content(),
|
||||
ReplacesState: prevEvent.EventID(),
|
||||
PrevSender: prevEvent.Sender(),
|
||||
}
|
||||
|
||||
return event.SetUnsigned(prev)
|
||||
|
|
|
@ -30,7 +30,13 @@ const pathPrefixR0 = "/_matrix/client/r0"
|
|||
// Setup configures the given mux with sync-server listeners
|
||||
func Setup(apiMux *mux.Router, srp *sync.RequestPool, deviceDB *devices.Database) {
|
||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||
|
||||
r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
return srp.OnIncomingSyncRequest(req, device)
|
||||
})).Methods("GET")
|
||||
|
||||
r0mux.Handle("/rooms/{roomID}/state", common.MakeAuthAPI("room_state", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return srp.OnIncomingStateRequest(req, vars["roomID"])
|
||||
})).Methods("GET")
|
||||
}
|
||||
|
|
|
@ -216,7 +216,7 @@ func (s *currentRoomStateStatements) selectEventsWithEventIDs(
|
|||
}
|
||||
|
||||
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
|
||||
var result []gomatrixserverlib.Event
|
||||
result := []gomatrixserverlib.Event{}
|
||||
for rows.Next() {
|
||||
var eventBytes []byte
|
||||
if err := rows.Scan(&eventBytes); err != nil {
|
||||
|
|
|
@ -162,6 +162,19 @@ func (d *SyncServerDatabase) GetStateEvent(
|
|||
return d.roomstate.selectStateEvent(ctx, evType, roomID, stateKey)
|
||||
}
|
||||
|
||||
// GetStateEventsForRoom fetches the state events for a given room.
|
||||
// Returns an empty slice if no state events could be found for this room.
|
||||
// Returns an error if there was an issue with the retrieval.
|
||||
func (d *SyncServerDatabase) GetStateEventsForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (stateEvents []gomatrixserverlib.Event, err error) {
|
||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
||||
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
|
||||
return d.syncStreamPositionTx(ctx, nil)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
|
@ -105,6 +106,56 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
|||
}
|
||||
}
|
||||
|
||||
type stateEventInStateResp struct {
|
||||
gomatrixserverlib.ClientEvent
|
||||
PrevContent json.RawMessage `json:"prev_content,omitempty"`
|
||||
ReplacesState string `json:"replaces_state,omitempty"`
|
||||
}
|
||||
|
||||
// OnIncomingStateRequest is called when a client makes a /rooms/{roomID}/state
|
||||
// request. It will fetch all the state events from the specified room and will
|
||||
// append the necessary keys to them if applicable before returning them.
|
||||
// Returns an error if something went wrong in the process.
|
||||
// TODO: Check if the user is in the room. If not, check if the room's history
|
||||
// is publicly visible. Current behaviour is returning an empty array if the
|
||||
// user cannot see the room's history.
|
||||
func (rp *RequestPool) OnIncomingStateRequest(req *http.Request, roomID string) util.JSONResponse {
|
||||
stateEvents, err := rp.db.GetStateEventsForRoom(req.Context(), roomID)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
resp := []stateEventInStateResp{}
|
||||
// Fill the prev_content and replaces_state keys if necessary
|
||||
for _, event := range stateEvents {
|
||||
stateEvent := stateEventInStateResp{
|
||||
ClientEvent: gomatrixserverlib.ToClientEvent(event, gomatrixserverlib.FormatAll),
|
||||
}
|
||||
var prevEventRef types.PrevEventRef
|
||||
if len(event.Unsigned()) > 0 {
|
||||
if err := json.Unmarshal(event.Unsigned(), &prevEventRef); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
// Fills the previous state event ID if the state event replaces another
|
||||
// state event
|
||||
if len(prevEventRef.ReplacesState) > 0 {
|
||||
stateEvent.ReplacesState = prevEventRef.ReplacesState
|
||||
}
|
||||
// Fill the previous event if the state event references a previous event
|
||||
if prevEventRef.PrevContent != nil {
|
||||
stateEvent.PrevContent = prevEventRef.PrevContent
|
||||
}
|
||||
}
|
||||
|
||||
resp = append(resp, stateEvent)
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: 200,
|
||||
JSON: resp,
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) {
|
||||
// TODO: handle ignored users
|
||||
if req.since == types.StreamPosition(0) {
|
||||
|
@ -116,8 +167,11 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.Stre
|
|||
func (rp *RequestPool) appendAccountData(
|
||||
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
|
||||
) (*types.Response, error) {
|
||||
// TODO: We currently send all account data on every sync response, we should instead send data
|
||||
// that has changed on incremental sync responses
|
||||
// TODO: Account data doesn't have a sync position of its own, meaning that
|
||||
// account data might be sent multiple time to the client if multiple account
|
||||
// data keys were set between two message. This isn't a huge issue since the
|
||||
// duplicate data doesn't represent a huge quantity of data, but an optimisation
|
||||
// here would be making sure each data is sent only once to the client.
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
@ -28,6 +29,13 @@ func (sp StreamPosition) String() string {
|
|||
return strconv.FormatInt(int64(sp), 10)
|
||||
}
|
||||
|
||||
// PrevEventRef represents a reference to a previous event in a state event upgrade
|
||||
type PrevEventRef struct {
|
||||
PrevContent json.RawMessage `json:"prev_content"`
|
||||
ReplacesState string `json:"replaces_state"`
|
||||
PrevSender string `json:"prev_sender"`
|
||||
}
|
||||
|
||||
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
||||
type Response struct {
|
||||
NextBatch string `json:"next_batch"`
|
||||
|
|
Loading…
Reference in a new issue