Add API for querying the state after a list of events (#123)

* Add API for querying the state after a list of events

* Implement state after events API

* Actually implement the API

* Reorder .travis.yml

* Comments

* Use common.MakeAPI
main
Mark Haines 2017-05-30 17:44:31 +01:00 committed by GitHub
parent e21cd5ae66
commit 65b66a6452
4 changed files with 217 additions and 27 deletions

View File

@ -17,9 +17,9 @@ install:
- go get github.com/constabulary/gb/... - go get github.com/constabulary/gb/...
- go get github.com/golang/lint/golint - go get github.com/golang/lint/golint
- go get github.com/fzipp/gocyclo - go get github.com/fzipp/gocyclo
- ./travis-install-kafka.sh
script: script:
- ./travis-install-kafka.sh
- ./travis-test.sh - ./travis-test.sh
notifications: notifications:

View File

@ -44,6 +44,30 @@ type QueryLatestEventsAndStateResponse struct {
StateEvents []gomatrixserverlib.Event StateEvents []gomatrixserverlib.Event
} }
// QueryStateAfterEventsRequest is a request to QueryStateAfterEvents
type QueryStateAfterEventsRequest struct {
// The room ID to query the state in.
RoomID string
// The list of previous events to return the events after.
PrevEventIDs []string
// The state key tuples to fetch from the state
StateToFetch []gomatrixserverlib.StateKeyTuple
}
// QueryStateAfterEventsResponse is a response to QueryStateAfterEvents
type QueryStateAfterEventsResponse struct {
// Copy of the request for debugging.
QueryStateAfterEventsRequest
// Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool
// Do all the previous events exist on this roomserver?
// If some of previous events do not exist this will be false and StateEvents will be empty.
PrevEventsExist bool
// The state events requested.
StateEvents []gomatrixserverlib.Event
}
// RoomserverQueryAPI is used to query information from the room server. // RoomserverQueryAPI is used to query information from the room server.
type RoomserverQueryAPI interface { type RoomserverQueryAPI interface {
// Query the latest events and state for a room from the room server. // Query the latest events and state for a room from the room server.
@ -51,11 +75,20 @@ type RoomserverQueryAPI interface {
request *QueryLatestEventsAndStateRequest, request *QueryLatestEventsAndStateRequest,
response *QueryLatestEventsAndStateResponse, response *QueryLatestEventsAndStateResponse,
) error ) error
// Query the state after a list of events in a room from the room server.
QueryStateAfterEvents(
request *QueryStateAfterEventsRequest,
response *QueryStateAfterEventsResponse,
) error
} }
// RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API. // RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API.
const RoomserverQueryLatestEventsAndStatePath = "/api/roomserver/QueryLatestEventsAndState" const RoomserverQueryLatestEventsAndStatePath = "/api/roomserver/QueryLatestEventsAndState"
// RoomserverQueryStateAfterEventsPath is the HTTP path for the QueryStateAfterEvents API.
const RoomserverQueryStateAfterEventsPath = "/api/roomserver/QueryStateAfterEvents"
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil then it uses the http.DefaultClient // If httpClient is nil then it uses the http.DefaultClient
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI { func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI {
@ -79,6 +112,15 @@ func (h *httpRoomserverQueryAPI) QueryLatestEventsAndState(
return postJSON(h.httpClient, apiURL, request, response) return postJSON(h.httpClient, apiURL, request, response)
} }
// QueryStateAfterEvents implements RoomserverQueryAPI
func (h *httpRoomserverQueryAPI) QueryStateAfterEvents(
request *QueryStateAfterEventsRequest,
response *QueryStateAfterEventsResponse,
) error {
apiURL := h.roomserverURL + RoomserverQueryStateAfterEventsPath
return postJSON(h.httpClient, apiURL, request, response)
}
func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error { func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error {
jsonBytes, err := json.Marshal(request) jsonBytes, err := json.Marshal(request)
if err != nil { if err != nil {

View File

@ -16,12 +16,12 @@ package query
import ( import (
"encoding/json" "encoding/json"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"net/http" "net/http"
) )
@ -68,6 +68,54 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
return err return err
} }
stateEvents, err := r.loadStateEvents(stateEntries)
if err != nil {
return err
}
response.StateEvents = stateEvents
return nil
}
// QueryStateAfterEvents implements api.RoomserverQueryAPI
func (r *RoomserverQueryAPI) QueryStateAfterEvents(
request *api.QueryStateAfterEventsRequest,
response *api.QueryStateAfterEventsResponse,
) (err error) {
response.QueryStateAfterEventsRequest = *request
roomNID, err := r.DB.RoomNID(request.RoomID)
if err != nil {
return err
}
if roomNID == 0 {
return nil
}
response.RoomExists = true
prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs)
if err != nil {
// TODO: Check if the error was because we are missing events from the
// database or are missing state at events from the database.
return err
}
response.PrevEventsExist = true
// Lookup the currrent state for the requested tuples.
stateEntries, err := state.LoadStateAfterEventsForStringTuples(r.DB, prevStates, request.StateToFetch)
if err != nil {
return err
}
stateEvents, err := r.loadStateEvents(stateEntries)
if err != nil {
return err
}
response.StateEvents = stateEvents
return nil
}
func (r *RoomserverQueryAPI) loadStateEvents(stateEntries []types.StateEntry) ([]gomatrixserverlib.Event, error) {
eventNIDs := make([]types.EventNID, len(stateEntries)) eventNIDs := make([]types.EventNID, len(stateEntries))
for i := range stateEntries { for i := range stateEntries {
eventNIDs[i] = stateEntries[i].EventNID eventNIDs[i] = stateEntries[i].EventNID
@ -75,21 +123,21 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
stateEvents, err := r.DB.Events(eventNIDs) stateEvents, err := r.DB.Events(eventNIDs)
if err != nil { if err != nil {
return err return nil, err
} }
response.StateEvents = make([]gomatrixserverlib.Event, len(stateEvents)) result := make([]gomatrixserverlib.Event, len(stateEvents))
for i := range stateEvents { for i := range stateEvents {
response.StateEvents[i] = stateEvents[i].Event result[i] = stateEvents[i].Event
} }
return nil return result, nil
} }
// SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux. // SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux.
func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle( servMux.Handle(
api.RoomserverQueryLatestEventsAndStatePath, api.RoomserverQueryLatestEventsAndStatePath,
makeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse { common.MakeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse {
var request api.QueryLatestEventsAndStateRequest var request api.QueryLatestEventsAndStateRequest
var response api.QueryLatestEventsAndStateResponse var response api.QueryLatestEventsAndStateResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
@ -101,8 +149,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) {
return util.JSONResponse{Code: 200, JSON: &response} return util.JSONResponse{Code: 200, JSON: &response}
}), }),
) )
servMux.Handle(
api.RoomserverQueryStateAfterEventsPath,
common.MakeAPI("query_state_after_events", func(req *http.Request) util.JSONResponse {
var request api.QueryStateAfterEventsRequest
var response api.QueryStateAfterEventsResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
} }
if err := r.QueryStateAfterEvents(&request, &response); err != nil {
func makeAPI(metric string, apiFunc func(req *http.Request) util.JSONResponse) http.Handler { return util.ErrorResponse(err)
return prometheus.InstrumentHandler(metric, util.MakeJSONAPI(util.NewJSONRequestHandler(apiFunc))) }
return util.JSONResponse{Code: 200, JSON: &response}
}),
)
} }

View File

@ -316,6 +316,82 @@ func loadStateAtSnapshotForNumericTuples(
return fullState, nil return fullState, nil
} }
// LoadStateAfterEventsForStringTuples loads the state for a list of event type
// and state key pairs after list of events.
// This is used when we only want to load a subset of the room state after a list of events.
// If there is no entry for a given event type and state key pair then it will be discarded.
// This is typically the state before an event.
// Returns a sorted list of state entries or an error if there was a problem talking to the database.
func LoadStateAfterEventsForStringTuples(
db RoomStateDatabase, prevStates []types.StateAtEvent, stateKeyTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateEntry, error) {
numericTuples, err := stringTuplesToNumericTuples(db, stateKeyTuples)
if err != nil {
return nil, err
}
return loadStateAfterEventsForNumericTuples(db, prevStates, numericTuples)
}
func loadStateAfterEventsForNumericTuples(
db RoomStateDatabase, prevStates []types.StateAtEvent, stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntry, error) {
if len(prevStates) == 1 {
// Fast path for a single event.
prevState := prevStates[0]
result, err := loadStateAtSnapshotForNumericTuples(
db, prevState.BeforeStateSnapshotNID, stateKeyTuples,
)
if err != nil {
return nil, err
}
if prevState.IsStateEvent() {
// The result is current the state before the requested event.
// We want the state after the requested event.
// If the requested event was a state event then we need to
// update that key in the result.
// If the requested event wasn't a state event then the state after
// it is the same as the state before it.
for i := range result {
if result[i].StateKeyTuple == prevState.StateKeyTuple {
result[i] = prevState.StateEntry
}
}
}
return result, nil
}
// Slow path for more that one event.
// Load the entire state so that we can do conflict resolution if we need to.
// TODO: The are some optimistations we could do here:
// 1) We only need to do conflict resolution if there is a conflict in the
// requested tuples so we might try loading just those tuples and then
// checking for conflicts.
// 2) When there is a conflict we still only need to load the state
// needed to do conflict resolution which would save us having to load
// the full state.
// TODO: Add metrics for this as it could take a long time for big rooms
// with large conflicts.
fullState, _, _, err := calculateStateAfterManyEvents(db, prevStates)
if err != nil {
return nil, err
}
// Sort the full state so we can use it as a map.
sort.Sort(stateEntrySorter(fullState))
// Filter the full state down to the required tuples.
var result []types.StateEntry
for _, tuple := range stateKeyTuples {
eventNID, ok := stateEntryMap(fullState).lookup(tuple)
if ok {
result = append(result, types.StateEntry{tuple, eventNID})
}
}
sort.Sort(stateEntrySorter(result))
return result, nil
}
var calculateStateDurations = prometheus.NewSummaryVec( var calculateStateDurations = prometheus.NewSummaryVec(
prometheus.SummaryOpts{ prometheus.SummaryOpts{
Namespace: "dendrite", Namespace: "dendrite",
@ -491,12 +567,30 @@ const maxStateBlockNIDs = 64
func calculateAndStoreStateAfterManyEvents( func calculateAndStoreStateAfterManyEvents(
db RoomStateDatabase, roomNID types.RoomNID, prevStates []types.StateAtEvent, metrics calculateStateMetrics, db RoomStateDatabase, roomNID types.RoomNID, prevStates []types.StateAtEvent, metrics calculateStateMetrics,
) (types.StateSnapshotNID, error) { ) (types.StateSnapshotNID, error) {
state, algorithm, conflictLength, err := calculateStateAfterManyEvents(db, prevStates)
metrics.algorithm = algorithm
if err != nil {
return metrics.stop(0, err)
}
// TODO: Check if we can encode the new state as a delta against the
// previous state.
metrics.conflictLength = conflictLength
metrics.fullStateLength = len(state)
return metrics.stop(db.AddState(roomNID, nil, state))
}
func calculateStateAfterManyEvents(
db RoomStateDatabase, prevStates []types.StateAtEvent,
) (state []types.StateEntry, algorithm string, conflictLength int, err error) {
var combined []types.StateEntry
// Conflict resolution. // Conflict resolution.
// First stage: load the state after each of the prev events. // First stage: load the state after each of the prev events.
combined, err := LoadCombinedStateAfterEvents(db, prevStates) combined, err = LoadCombinedStateAfterEvents(db, prevStates)
if err != nil { if err != nil {
metrics.algorithm = "_load_combined_state" algorithm = "_load_combined_state"
return metrics.stop(0, err) return
} }
// Collect all the entries with the same type and key together. // Collect all the entries with the same type and key together.
@ -508,9 +602,8 @@ func calculateAndStoreStateAfterManyEvents(
// Find the conflicts // Find the conflicts
conflicts := findDuplicateStateKeys(combined) conflicts := findDuplicateStateKeys(combined)
var state []types.StateEntry
if len(conflicts) > 0 { if len(conflicts) > 0 {
metrics.conflictLength = len(conflicts) conflictLength = len(conflicts)
// 5) There are conflicting state events, for each conflict workout // 5) There are conflicting state events, for each conflict workout
// what the appropriate state event is. // what the appropriate state event is.
@ -523,23 +616,20 @@ func calculateAndStoreStateAfterManyEvents(
} }
} }
resolved, err := resolveConflicts(db, notConflicted, conflicts) var resolved []types.StateEntry
resolved, err = resolveConflicts(db, notConflicted, conflicts)
if err != nil { if err != nil {
metrics.algorithm = "_resolve_conflicts" algorithm = "_resolve_conflicts"
return metrics.stop(0, err) return
} }
metrics.algorithm = "full_state_with_conflicts" algorithm = "full_state_with_conflicts"
state = resolved state = resolved
} else { } else {
metrics.algorithm = "full_state_no_conflicts" algorithm = "full_state_no_conflicts"
// 6) There weren't any conflicts // 6) There weren't any conflicts
state = combined state = combined
} }
metrics.fullStateLength = len(state) return
// TODO: Check if we can encode the new state as a delta against the
// previous state.
return metrics.stop(db.AddState(roomNID, nil, state))
} }
// resolveConflicts resolves a list of conflicted state entries. It takes two lists. // resolveConflicts resolves a list of conflicted state entries. It takes two lists.