diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index aea912c2..3b46487a 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -48,7 +48,7 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs for i, event := range events { ires[i] = api.InputRoomEvent{ Kind: api.KindNew, - Event: event.JSON(), + Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), } @@ -70,7 +70,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat for i, outlier := range outliers { ires[i] = api.InputRoomEvent{ Kind: api.KindOutlier, - Event: outlier.JSON(), + Event: outlier, AuthEventIDs: outlier.AuthEventIDs(), } eventIDs[i] = outlier.EventID() @@ -83,7 +83,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat ires[len(outliers)] = api.InputRoomEvent{ Kind: api.KindNew, - Event: event.JSON(), + Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, StateEventIDs: stateEventIDs, diff --git a/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go b/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go index ff949d3e..a37c7685 100644 --- a/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go @@ -21,12 +21,13 @@ import ( "encoding/json" "flag" "fmt" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" - "golang.org/x/crypto/ed25519" "os" "strings" "time" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "golang.org/x/crypto/ed25519" ) const usage = `Usage: %s @@ -131,7 +132,7 @@ func writeEvent(event gomatrixserverlib.Event) { if *format == "InputRoomEvent" { var ire api.InputRoomEvent ire.Kind = api.KindNew - ire.Event = event.JSON() + ire.Event = event authEventIDs := []string{} for _, ref := range b.AuthEvents { authEventIDs = append(authEventIDs, ref.EventID) diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go index 64d4ffc5..c4bea7f3 100644 --- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go @@ -252,9 +252,9 @@ func main() { input := []string{ `{ - "AuthEventIDs": [], - "Kind": 1, - "Event": { + "auth_event_ids": [], + "kind": 1, + "event": { "origin": "matrix.org", "signatures": { "matrix.org": { @@ -274,10 +274,10 @@ func main() { "hashes": {"sha256": "Q05VLC8nztN2tguy+KnHxxhitI95wK9NelnsDaXRqeo"}, "type": "m.room.create"} }`, `{ - "AuthEventIDs": ["$1463671337126266wrSBX:matrix.org"], - "Kind": 2, - "StateEventIDs": ["$1463671337126266wrSBX:matrix.org"], - "Event": { + "auth_event_ids": ["$1463671337126266wrSBX:matrix.org"], + "kind": 2, + "state_event_ids": ["$1463671337126266wrSBX:matrix.org"], + "event": { "origin": "matrix.org", "signatures": { "matrix.org": { @@ -305,7 +305,7 @@ func main() { ]], "hashes": {"sha256": "t9t3sZV1Eu0P9Jyrs7pge6UTa1zuTbRdVxeUHnrQVH0"}, "type": "m.room.member"}, - "HasState": true + "has_state": true }`, } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index 4d576aef..558eb28c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -16,7 +16,9 @@ package api import ( - "encoding/json" + "net/http" + + "github.com/matrix-org/gomatrixserverlib" ) const ( @@ -25,19 +27,14 @@ const ( // These events are state events used to authenticate other events. // They can become part of the contiguous event graph via backfill. KindOutlier = 1 - // KindJoin event start a new contiguous event graph. The event must be a - // m.room.member event joining this server to the room. This must come with - // the state at the event. If the event is contiguous with the existing - // graph for the room then it is treated as a normal new event. - KindJoin = 2 // KindNew event extend the contiguous graph going forwards. // They usually don't need state, but may include state if the // there was a new event that references an event that we don't // have a copy of. - KindNew = 3 + KindNew = 2 // KindBackfill event extend the contiguous graph going backwards. // They always have state. - KindBackfill = 4 + KindBackfill = 3 ) // DoNotSendToOtherServers tells us not to send the event to other matrix @@ -49,77 +46,66 @@ const DoNotSendToOtherServers = "" type InputRoomEvent struct { // Whether this event is new, backfilled or an outlier. // This controls how the event is processed. - Kind int + Kind int `json:"kind"` // The event JSON for the event to add. - Event []byte + Event gomatrixserverlib.Event `json:"event"` // List of state event IDs that authenticate this event. // These are likely derived from the "auth_events" JSON key of the event. // But can be different because the "auth_events" key can be incomplete or wrong. // For example many matrix events forget to reference the m.room.create event even though it is needed for auth. // (since synapse allows this to happen we have to allow it as well.) - AuthEventIDs []string + AuthEventIDs []string `json:"auth_event_ids"` // Whether the state is supplied as a list of event IDs or whether it // should be derived from the state at the previous events. - HasState bool + HasState bool `json:"has_state"` // Optional list of state event IDs forming the state before this event. // These state events must have already been persisted. // These are only used if HasState is true. // The list can be empty, for example when storing the first event in a room. - StateEventIDs []string + StateEventIDs []string `json:"state_event_ids"` // The server name to use to push this event to other servers. // Or empty if this event shouldn't be pushed to other servers. - SendAsServer string + SendAsServer string `json:"send_as_server"` } -// UnmarshalJSON implements json.Unmarshaller -func (ire *InputRoomEvent) UnmarshalJSON(data []byte) error { - // Create a struct rather than unmarshalling directly into the InputRoomEvent - // so that we can use json.RawMessage. - // We use json.RawMessage so that the event JSON is sent as JSON rather than - // being base64 encoded which is the default for []byte. - var content struct { - Kind int - Event *json.RawMessage - AuthEventIDs []string - StateEventIDs []string - HasState bool - SendAsServer string - } - if err := json.Unmarshal(data, &content); err != nil { - return err - } - ire.Kind = content.Kind - ire.AuthEventIDs = content.AuthEventIDs - ire.StateEventIDs = content.StateEventIDs - ire.HasState = content.HasState - ire.SendAsServer = content.SendAsServer - if content.Event != nil { - ire.Event = []byte(*content.Event) - } - return nil +// InputRoomEventsRequest is a request to InputRoomEvents +type InputRoomEventsRequest struct { + InputRoomEvents []InputRoomEvent `json:"input_room_events"` } -// MarshalJSON implements json.Marshaller -func (ire InputRoomEvent) MarshalJSON() ([]byte, error) { - // Create a struct rather than marshalling directly from the InputRoomEvent - // so that we can use json.RawMessage. - // We use json.RawMessage so that the event JSON is sent as JSON rather than - // being base64 encoded which is the default for []byte. - event := json.RawMessage(ire.Event) - content := struct { - Kind int - Event *json.RawMessage - AuthEventIDs []string - StateEventIDs []string - HasState bool - SendAsServer string - }{ - Kind: ire.Kind, - AuthEventIDs: ire.AuthEventIDs, - StateEventIDs: ire.StateEventIDs, - Event: &event, - HasState: ire.HasState, - SendAsServer: ire.SendAsServer, - } - return json.Marshal(&content) +// InputRoomEventsResponse is a response to InputRoomEvents +type InputRoomEventsResponse struct{} + +// RoomserverInputAPI is used to write events to the room server. +type RoomserverInputAPI interface { + InputRoomEvents( + request *InputRoomEventsRequest, + response *InputRoomEventsResponse, + ) error +} + +// RoomserverInputRoomEventsPath is the HTTP path for the InputRoomEvents API. +const RoomserverInputRoomEventsPath = "/api/roomserver/inputRoomEvents" + +// NewRoomserverInputAPIHTTP creates a RoomserverInputAPI implemented by talking to a HTTP POST API. +// If httpClient is nil then it uses the http.DefaultClient +func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverInputAPI { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &httpRoomserverInputAPI{roomserverURL, httpClient} +} + +type httpRoomserverInputAPI struct { + roomserverURL string + httpClient *http.Client +} + +// InputRoomEvents implements RoomserverInputAPI +func (h *httpRoomserverInputAPI) InputRoomEvents( + request *InputRoomEventsRequest, + response *InputRoomEventsResponse, +) error { + apiURL := h.roomserverURL + RoomserverInputRoomEventsPath + return postJSON(h.httpClient, apiURL, request, response) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index 9442f9f6..6e6a838a 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -136,12 +136,12 @@ func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) Ro if httpClient == nil { httpClient = http.DefaultClient } - return &httpRoomserverQueryAPI{roomserverURL, *httpClient} + return &httpRoomserverQueryAPI{roomserverURL, httpClient} } type httpRoomserverQueryAPI struct { roomserverURL string - httpClient http.Client + httpClient *http.Client } // QueryLatestEventsAndState implements RoomserverQueryAPI @@ -171,7 +171,7 @@ func (h *httpRoomserverQueryAPI) QueryEventsByID( return postJSON(h.httpClient, apiURL, request, response) } -func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error { +func postJSON(httpClient *http.Client, apiURL string, request, response interface{}) error { jsonBytes, err := json.Marshal(request) if err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index acf30221..f8acff47 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -49,10 +49,7 @@ type OutputRoomEventWriter interface { func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { // Parse and validate the event JSON - event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event) - if err != nil { - return err - } + event := input.Event // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(db, event, input.AuthEventIDs) @@ -79,8 +76,8 @@ func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api. if input.HasState { // We've been told what the state at the event is so we don't need to calculate it. // Check that those state events are in the database and store the state. - entries, err := db.StateEntriesForEventIDs(input.StateEventIDs) - if err != nil { + var entries []types.StateEntry + if entries, err = db.StateEntriesForEventIDs(input.StateEventIDs); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go new file mode 100644 index 00000000..ffbebd0c --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -0,0 +1,107 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package input contains the code processes new room events +package input + +import ( + "encoding/json" + "fmt" + "sync/atomic" + + "net/http" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/util" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// RoomserverInputAPI implements api.RoomserverInputAPI +type RoomserverInputAPI struct { + DB RoomEventDatabase + Producer sarama.SyncProducer + // The kafkaesque topic to output new room events to. + // This is the name used in kafka to identify the stream to write events to. + OutputRoomEventTopic string + // If non-nil then the API will stop processing messages after this + // many messages and will shutdown. Malformed messages are not in the count. + StopProcessingAfter *int64 + // If not-nil then the API will call this to shutdown the server. + // If this is nil then the API will continue to process messsages even + // though StopProcessingAfter has been reached. + ShutdownCallback func(reason string) + // How many messages the consumer has processed. + processed int64 +} + +// WriteOutputRoomEvent implements OutputRoomEventWriter +func (r *RoomserverInputAPI) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { + var m sarama.ProducerMessage + oe := api.OutputEvent{ + Type: api.OutputTypeNewRoomEvent, + NewRoomEvent: &output, + } + value, err := json.Marshal(oe) + if err != nil { + return err + } + m.Topic = r.OutputRoomEventTopic + m.Key = sarama.StringEncoder("") + m.Value = sarama.ByteEncoder(value) + _, _, err = r.Producer.SendMessage(&m) + return err +} + +// InputRoomEvents implements api.RoomserverInputAPI +func (r *RoomserverInputAPI) InputRoomEvents( + request *api.InputRoomEventsRequest, + response *api.InputRoomEventsResponse, +) error { + for i := range request.InputRoomEvents { + if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil { + return err + } + // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. + processed := atomic.AddInt64(&r.processed, 1) + // Check if we should stop processing. + // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. + // If we try to stop processing after M message and we have N goroutines then we will process somewhere + // between M and (N + M) messages because the N goroutines could all try to process what they think will be the + // last message. We could be more careful here but this is good enough for getting rough benchmarks. + if r.StopProcessingAfter != nil && processed >= int64(*r.StopProcessingAfter) { + if r.ShutdownCallback != nil { + r.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", r.processed)) + } + } + } + return nil +} + +// SetupHTTP adds the RoomserverInputAPI handlers to the http.ServeMux. +func (r *RoomserverInputAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle(api.RoomserverInputRoomEventsPath, + common.MakeAPI("inputRoomEvents", func(req *http.Request) util.JSONResponse { + var request api.InputRoomEventsRequest + var response api.InputRoomEventsResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(400, err.Error()) + } + if err := r.InputRoomEvents(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 31ebc022..142df90e 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -16,13 +16,14 @@ package query import ( "encoding/json" + "net/http" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "net/http" ) // RoomserverQueryAPIDatabase has the storage APIs needed to implement the query API. @@ -173,7 +174,7 @@ func (r *RoomserverQueryAPI) loadEvents(eventNIDs []types.EventNID) ([]gomatrixs func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { servMux.Handle( api.RoomserverQueryLatestEventsAndStatePath, - common.MakeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse { + common.MakeAPI("queryLatestEventsAndState", func(req *http.Request) util.JSONResponse { var request api.QueryLatestEventsAndStateRequest var response api.QueryLatestEventsAndStateResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -187,7 +188,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { ) servMux.Handle( api.RoomserverQueryStateAfterEventsPath, - common.MakeAPI("query_state_after_events", func(req *http.Request) util.JSONResponse { + common.MakeAPI("queryStateAfterEvents", func(req *http.Request) util.JSONResponse { var request api.QueryStateAfterEventsRequest var response api.QueryStateAfterEventsResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -201,7 +202,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { ) servMux.Handle( api.RoomserverQueryEventsByIDPath, - common.MakeAPI("query_events_by_id", func(req *http.Request) util.JSONResponse { + common.MakeAPI("queryEventsByID", func(req *http.Request) util.JSONResponse { var request api.QueryEventsByIDRequest var response api.QueryEventsByIDResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil {