From ef7b934d51b95bbd68fa32e4b475f24b430010bb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 2 Jun 2017 14:32:36 +0100 Subject: [PATCH] Add Query API for querying events by ID (#129) --- .../dendrite/roomserver/api/query.go | 40 +++++++++++++++ .../dendrite/roomserver/query/query.go | 50 ++++++++++++++++++- .../roomserver/storage/events_table.go | 25 ++++++++++ .../dendrite/roomserver/storage/storage.go | 5 ++ 4 files changed, 118 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index 14afb3e8..6f26212d 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -41,6 +41,7 @@ type QueryLatestEventsAndStateResponse struct { // The latest events in the room. LatestEvents []gomatrixserverlib.EventReference // The state events requested. + // This list will be in an arbitrary order. StateEvents []gomatrixserverlib.Event } @@ -65,9 +66,30 @@ type QueryStateAfterEventsResponse struct { // If some of previous events do not exist this will be false and StateEvents will be empty. PrevEventsExist bool // The state events requested. + // This list will be in an arbitrary order. StateEvents []gomatrixserverlib.Event } +// QueryEventsByIDRequest is a request to QueryEventsByID +type QueryEventsByIDRequest struct { + // The event IDs to look up. + EventIDs []string +} + +// QueryEventsByIDResponse is a response to QueryEventsByID +type QueryEventsByIDResponse struct { + // Copy of the request for debugging. + QueryEventsByIDRequest + // A list of events with the requested IDs. + // If the roomserver does not have a copy of a requested event + // then it will omit that event from the list. + // If the roomserver thinks it has a copy of the event, but + // fails to read it from the database then it will fail + // the entire request. + // This list will be in an arbitrary order. + Events []gomatrixserverlib.Event +} + // RoomserverQueryAPI is used to query information from the room server. type RoomserverQueryAPI interface { // Query the latest events and state for a room from the room server. @@ -81,6 +103,12 @@ type RoomserverQueryAPI interface { request *QueryStateAfterEventsRequest, response *QueryStateAfterEventsResponse, ) error + + // Query a list of events by event ID. + QueryEventsByID( + request *QueryEventsByIDRequest, + response *QueryEventsByIDResponse, + ) error } // RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API. @@ -89,6 +117,9 @@ const RoomserverQueryLatestEventsAndStatePath = "/api/roomserver/QueryLatestEven // RoomserverQueryStateAfterEventsPath is the HTTP path for the QueryStateAfterEvents API. const RoomserverQueryStateAfterEventsPath = "/api/roomserver/QueryStateAfterEvents" +// RoomserverQueryEventsByIDPath is the HTTP path for the QueryEventsByID API. +const RoomserverQueryEventsByIDPath = "/api/roomserver/QueryEventsByID" + // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // If httpClient is nil then it uses the http.DefaultClient func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverQueryAPI { @@ -121,6 +152,15 @@ func (h *httpRoomserverQueryAPI) QueryStateAfterEvents( return postJSON(h.httpClient, apiURL, request, response) } +// QueryEventsByID implements RoomserverQueryAPI +func (h *httpRoomserverQueryAPI) QueryEventsByID( + request *QueryEventsByIDRequest, + response *QueryEventsByIDResponse, +) error { + apiURL := h.roomserverURL + RoomserverQueryEventsByIDPath + return postJSON(h.httpClient, apiURL, request, response) +} + func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error { jsonBytes, err := json.Marshal(request) if err != nil { diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index fedc9870..6f236e93 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -35,6 +35,9 @@ type RoomserverQueryAPIDatabase interface { // Lookup event references for the latest events in the room and the current state snapshot. // Returns an error if there was a problem talking to the database. LatestEventIDs(roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, error) + // Lookup the numeric IDs for a list of events. + // Returns an error if there was a problem talking to the database. + EventNIDs(eventIDs []string) (map[string]types.EventNID, error) } // RoomserverQueryAPI is an implementation of RoomserverQueryAPI @@ -46,7 +49,7 @@ type RoomserverQueryAPI struct { func (r *RoomserverQueryAPI) QueryLatestEventsAndState( request *api.QueryLatestEventsAndStateRequest, response *api.QueryLatestEventsAndStateResponse, -) (err error) { +) error { response.QueryLatestEventsAndStateRequest = *request roomNID, err := r.DB.RoomNID(request.RoomID) if err != nil { @@ -81,7 +84,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( func (r *RoomserverQueryAPI) QueryStateAfterEvents( request *api.QueryStateAfterEventsRequest, response *api.QueryStateAfterEventsResponse, -) (err error) { +) error { response.QueryStateAfterEventsRequest = *request roomNID, err := r.DB.RoomNID(request.RoomID) if err != nil { @@ -115,12 +118,41 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( return nil } +// QueryEventsByID implements api.RoomserverQueryAPI +func (r *RoomserverQueryAPI) QueryEventsByID( + request *api.QueryEventsByIDRequest, + response *api.QueryEventsByIDResponse, +) error { + response.QueryEventsByIDRequest = *request + + eventNIDMap, err := r.DB.EventNIDs(request.EventIDs) + if err != nil { + return err + } + + var eventNIDs []types.EventNID + for _, nid := range eventNIDMap { + eventNIDs = append(eventNIDs, nid) + } + + events, err := r.loadEvents(eventNIDs) + if err != nil { + return err + } + + response.Events = events + return nil +} + func (r *RoomserverQueryAPI) loadStateEvents(stateEntries []types.StateEntry) ([]gomatrixserverlib.Event, error) { eventNIDs := make([]types.EventNID, len(stateEntries)) for i := range stateEntries { eventNIDs[i] = stateEntries[i].EventNID } + return r.loadEvents(eventNIDs) +} +func (r *RoomserverQueryAPI) loadEvents(eventNIDs []types.EventNID) ([]gomatrixserverlib.Event, error) { stateEvents, err := r.DB.Events(eventNIDs) if err != nil { return nil, err @@ -163,4 +195,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { return util.JSONResponse{Code: 200, JSON: &response} }), ) + servMux.Handle( + api.RoomserverQueryEventsByIDPath, + common.MakeAPI("query_events_by_id", func(req *http.Request) util.JSONResponse { + var request api.QueryEventsByIDRequest + var response api.QueryEventsByIDResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.QueryEventsByID(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index 696d35be..acaf43b6 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -104,6 +104,9 @@ const bulkSelectEventReferenceSQL = "" + const bulkSelectEventIDSQL = "" + "SELECT event_nid, event_id FROM events WHERE event_nid = ANY($1)" +const bulkSelectEventNIDSQL = "" + + "SELECT event_id, event_nid FROM events WHERE event_id = ANY($1)" + type eventStatements struct { insertEventStmt *sql.Stmt selectEventStmt *sql.Stmt @@ -116,6 +119,7 @@ type eventStatements struct { bulkSelectStateAtEventAndReferenceStmt *sql.Stmt bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt + bulkSelectEventNIDStmt *sql.Stmt } func (s *eventStatements) prepare(db *sql.DB) (err error) { @@ -136,6 +140,7 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { {&s.bulkSelectStateAtEventAndReferenceStmt, bulkSelectStateAtEventAndReferenceSQL}, {&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, + {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, }.prepare(db) } @@ -321,6 +326,26 @@ func (s *eventStatements) bulkSelectEventID(eventNIDs []types.EventNID) (map[typ return results, nil } +// bulkSelectEventNIDs returns a map from string event ID to numeric event ID. +// If an event ID is not in the database then it is omitted from the map. +func (s *eventStatements) bulkSelectEventNID(eventIDs []string) (map[string]types.EventNID, error) { + rows, err := s.bulkSelectEventNIDStmt.Query(pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() + results := make(map[string]types.EventNID, len(eventIDs)) + for rows.Next() { + var eventID string + var eventNID int64 + if err = rows.Scan(&eventID, &eventNID); err != nil { + return nil, err + } + results[eventID] = types.EventNID(eventNID) + } + return results, nil +} + func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { nids := make([]int64, len(eventNIDs)) for i := range eventNIDs { diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 8e527d63..b9b5eb1c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -170,6 +170,11 @@ func (d *Database) EventStateKeyNIDs(eventStateKeys []string) (map[string]types. return d.statements.bulkSelectEventStateKeyNID(eventStateKeys) } +// EventNIDs implements query.RoomQueryDatabase +func (d *Database) EventNIDs(eventIDs []string) (map[string]types.EventNID, error) { + return d.statements.bulkSelectEventNID(eventIDs) +} + // Events implements input.EventDatabase func (d *Database) Events(eventNIDs []types.EventNID) ([]types.Event, error) { eventJSONs, err := d.statements.bulkSelectEventJSON(eventNIDs)