Minor perf/debugging improvements (#1121)
* Minor perf/debugging improvements - publicroomsapi: Don't call QueryEventsByID with no event IDs - appservice: Consume only if there are 1 or more ASes - roomserver: don't keep a copy of the request "for debugging" - we trace now * fedsender: return early if we have no destinations * Unbreak testsmain
parent
ecd7accbad
commit
0dc4ceaa2d
|
@ -86,12 +86,16 @@ func NewInternalAPI(
|
|||
Cfg: base.Cfg,
|
||||
}
|
||||
|
||||
consumer := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
|
||||
rsAPI, workerStates,
|
||||
)
|
||||
if err := consumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
|
||||
// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
|
||||
// We can't add ASes at runtime so this is safe to do.
|
||||
if len(workerStates) > 0 {
|
||||
consumer := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
|
||||
rsAPI, workerStates,
|
||||
)
|
||||
if err := consumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
|
||||
}
|
||||
}
|
||||
|
||||
// Create application service transaction workers
|
||||
|
|
|
@ -128,7 +128,6 @@ func (t *testRoomserverAPI) QueryLatestEventsAndState(
|
|||
response *api.QueryLatestEventsAndStateResponse,
|
||||
) error {
|
||||
r := t.queryLatestEventsAndState(request)
|
||||
response.QueryLatestEventsAndStateRequest = *request
|
||||
response.RoomExists = r.RoomExists
|
||||
response.RoomVersion = testRoomVersion
|
||||
response.LatestEvents = r.LatestEvents
|
||||
|
@ -144,7 +143,6 @@ func (t *testRoomserverAPI) QueryStateAfterEvents(
|
|||
response *api.QueryStateAfterEventsResponse,
|
||||
) error {
|
||||
response.RoomVersion = testRoomVersion
|
||||
response.QueryStateAfterEventsRequest = *request
|
||||
res := t.queryStateAfterEvents(request)
|
||||
response.PrevEventsExist = res.PrevEventsExist
|
||||
response.RoomExists = res.RoomExists
|
||||
|
@ -612,7 +610,6 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
res.QueryEventsByIDRequest = *req
|
||||
return res
|
||||
},
|
||||
}
|
||||
|
|
|
@ -107,6 +107,9 @@ func (oqs *OutgoingQueues) SendEvent(
|
|||
|
||||
// Remove our own server from the list of destinations.
|
||||
destinations = filterAndDedupeDests(oqs.origin, destinations)
|
||||
if len(destinations) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"destinations": destinations, "event": ev.EventID(),
|
||||
|
|
|
@ -78,18 +78,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
|
||||
var remQueryRes api.QueryEventsByIDResponse
|
||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
|
||||
log.Warn(err)
|
||||
return err
|
||||
if len(output.NewRoomEvent.RemovesStateEventIDs) > 0 {
|
||||
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
|
||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
|
||||
log.Warn(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var addQueryEvents, remQueryEvents []gomatrixserverlib.Event
|
||||
for _, headeredEvent := range output.NewRoomEvent.AddsState() {
|
||||
addQueryEvents = append(addQueryEvents, headeredEvent.Event)
|
||||
}
|
||||
addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap())
|
||||
for _, headeredEvent := range remQueryRes.Events {
|
||||
remQueryEvents = append(remQueryEvents, headeredEvent.Event)
|
||||
}
|
||||
|
|
|
@ -33,8 +33,6 @@ type QueryLatestEventsAndStateRequest struct {
|
|||
// This is used when sending events to set the prev_events, auth_events and depth.
|
||||
// It is also used to tell whether the event is allowed by the event auth rules.
|
||||
type QueryLatestEventsAndStateResponse struct {
|
||||
// Copy of the request for debugging.
|
||||
QueryLatestEventsAndStateRequest
|
||||
// Does the room exist?
|
||||
// If the room doesn't exist this will be false and LatestEvents will be empty.
|
||||
RoomExists bool `json:"room_exists"`
|
||||
|
@ -66,8 +64,6 @@ type QueryStateAfterEventsRequest struct {
|
|||
|
||||
// QueryStateAfterEventsResponse is a response to QueryStateAfterEvents
|
||||
type QueryStateAfterEventsResponse struct {
|
||||
// Copy of the request for debugging.
|
||||
QueryStateAfterEventsRequest
|
||||
// Does the room exist on this roomserver?
|
||||
// If the room doesn't exist this will be false and StateEvents will be empty.
|
||||
RoomExists bool `json:"room_exists"`
|
||||
|
@ -89,8 +85,6 @@ type QueryEventsByIDRequest struct {
|
|||
|
||||
// QueryEventsByIDResponse is a response to QueryEventsByID
|
||||
type QueryEventsByIDResponse struct {
|
||||
// Copy of the request for debugging.
|
||||
QueryEventsByIDRequest
|
||||
// A list of events with the requested IDs.
|
||||
// If the roomserver does not have a copy of a requested event
|
||||
// then it will omit that event from the list.
|
||||
|
@ -187,8 +181,6 @@ type QueryStateAndAuthChainRequest struct {
|
|||
|
||||
// QueryStateAndAuthChainResponse is a response to QueryStateAndAuthChain
|
||||
type QueryStateAndAuthChainResponse struct {
|
||||
// Copy of the request for debugging.
|
||||
QueryStateAndAuthChainRequest
|
||||
// Does the room exist on this roomserver?
|
||||
// If the room doesn't exist this will be false and StateEvents will be empty.
|
||||
RoomExists bool `json:"room_exists"`
|
||||
|
|
|
@ -45,7 +45,6 @@ func (r *RoomserverInternalAPI) QueryLatestEventsAndState(
|
|||
|
||||
roomState := state.NewStateResolution(r.DB)
|
||||
|
||||
response.QueryLatestEventsAndStateRequest = *request
|
||||
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -105,7 +104,6 @@ func (r *RoomserverInternalAPI) QueryStateAfterEvents(
|
|||
|
||||
roomState := state.NewStateResolution(r.DB)
|
||||
|
||||
response.QueryStateAfterEventsRequest = *request
|
||||
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -153,8 +151,6 @@ func (r *RoomserverInternalAPI) QueryEventsByID(
|
|||
request *api.QueryEventsByIDRequest,
|
||||
response *api.QueryEventsByIDResponse,
|
||||
) error {
|
||||
response.QueryEventsByIDRequest = *request
|
||||
|
||||
eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -734,7 +730,6 @@ func (r *RoomserverInternalAPI) QueryStateAndAuthChain(
|
|||
request *api.QueryStateAndAuthChainRequest,
|
||||
response *api.QueryStateAndAuthChainResponse,
|
||||
) error {
|
||||
response.QueryStateAndAuthChainRequest = *request
|
||||
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue