Roomserver API changes (#1118)
* s/QueryBackfill/PerformBackfill/g * OutputEvent now includes AddStateEvents which contain the full event of extra state events * Only include adds not the current event * Get adding state rightmain
parent
25cd2dd1c9
commit
ec7718e7f8
|
@ -91,60 +91,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ev := output.NewRoomEvent.Event
|
events := []gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
|
||||||
log.WithFields(log.Fields{
|
events = append(events, output.NewRoomEvent.AddStateEvents...)
|
||||||
"event_id": ev.EventID(),
|
|
||||||
"room_id": ev.RoomID(),
|
|
||||||
"type": ev.Type(),
|
|
||||||
}).Info("appservice received an event from roomserver")
|
|
||||||
|
|
||||||
missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
events := append(missingEvents, ev)
|
|
||||||
|
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
return s.filterRoomserverEvents(context.TODO(), events)
|
return s.filterRoomserverEvents(context.TODO(), events)
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookupMissingStateEvents looks up the state events that are added by a new event,
|
|
||||||
// and returns any not already present.
|
|
||||||
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
|
|
||||||
addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
|
|
||||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
|
||||||
// Fast path if there aren't any new state events.
|
|
||||||
if len(addsStateEventIDs) == 0 {
|
|
||||||
return []gomatrixserverlib.HeaderedEvent{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast path if the only state event added is the event itself.
|
|
||||||
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
|
||||||
return []gomatrixserverlib.HeaderedEvent{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
result := []gomatrixserverlib.HeaderedEvent{}
|
|
||||||
missing := []string{}
|
|
||||||
for _, id := range addsStateEventIDs {
|
|
||||||
if id != event.EventID() {
|
|
||||||
// If the event isn't the current one, add it to the list of events
|
|
||||||
// to retrieve from the roomserver
|
|
||||||
missing = append(missing, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request the missing events from the roomserver
|
|
||||||
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
|
||||||
var eventResp api.QueryEventsByIDResponse
|
|
||||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
result = append(result, eventResp.Events...)
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterRoomserverEvents takes in events and decides whether any of them need
|
// filterRoomserverEvents takes in events and decides whether any of them need
|
||||||
// to be passed on to an external application service. It does this by checking
|
// to be passed on to an external application service. It does this by checking
|
||||||
// each namespace of each registered application service, and if there is a
|
// each namespace of each registered application service, and if there is a
|
||||||
|
|
|
@ -84,63 +84,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ev := output.NewRoomEvent.Event
|
return s.db.UpdateMemberships(
|
||||||
log.WithFields(log.Fields{
|
context.TODO(),
|
||||||
"event_id": ev.EventID(),
|
gomatrixserverlib.UnwrapEventHeaders(output.NewRoomEvent.AddsState()),
|
||||||
"room_id": ev.RoomID(),
|
output.NewRoomEvent.RemovesStateEventIDs,
|
||||||
"type": ev.Type(),
|
)
|
||||||
}).Info("received event from roomserver")
|
|
||||||
|
|
||||||
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs)
|
|
||||||
}
|
|
||||||
|
|
||||||
// lookupStateEvents looks up the state events that are added by a new event.
|
|
||||||
func (s *OutputRoomEventConsumer) lookupStateEvents(
|
|
||||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
|
||||||
// Fast path if there aren't any new state events.
|
|
||||||
if len(addsStateEventIDs) == 0 {
|
|
||||||
// If the event is a membership update (e.g. for a profile update), it won't
|
|
||||||
// show up in AddsStateEventIDs, so we need to add it manually
|
|
||||||
if event.Type() == "m.room.member" {
|
|
||||||
return []gomatrixserverlib.Event{event}, nil
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast path if the only state event added is the event itself.
|
|
||||||
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
|
||||||
return []gomatrixserverlib.Event{event}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
result := []gomatrixserverlib.Event{}
|
|
||||||
missing := []string{}
|
|
||||||
for _, id := range addsStateEventIDs {
|
|
||||||
// Append the current event in the results if its ID is in the events list
|
|
||||||
if id == event.EventID() {
|
|
||||||
result = append(result, event)
|
|
||||||
} else {
|
|
||||||
// If the event isn't the current one, add it to the list of events
|
|
||||||
// to retrieve from the roomserver
|
|
||||||
missing = append(missing, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request the missing events from the roomserver
|
|
||||||
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
|
||||||
var eventResp api.QueryEventsByIDResponse
|
|
||||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, headeredEvent := range eventResp.Events {
|
|
||||||
result = append(result, headeredEvent.Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ func Backfill(
|
||||||
roomID string,
|
roomID string,
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
var res api.QueryBackfillResponse
|
var res api.PerformBackfillResponse
|
||||||
var eIDs []string
|
var eIDs []string
|
||||||
var limit string
|
var limit string
|
||||||
var exists bool
|
var exists bool
|
||||||
|
@ -68,7 +68,7 @@ func Backfill(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populate the request.
|
// Populate the request.
|
||||||
req := api.QueryBackfillRequest{
|
req := api.PerformBackfillRequest{
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
// we don't know who the successors are for these events, which won't
|
// we don't know who the successors are for these events, which won't
|
||||||
// be a problem because we don't use that information when servicing /backfill requests,
|
// be a problem because we don't use that information when servicing /backfill requests,
|
||||||
|
@ -87,8 +87,8 @@ func Backfill(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query the roomserver.
|
// Query the roomserver.
|
||||||
if err = rsAPI.QueryBackfill(httpReq.Context(), &req, &res); err != nil {
|
if err = rsAPI.PerformBackfill(httpReq.Context(), &req, &res); err != nil {
|
||||||
util.GetLogger(httpReq.Context()).WithError(err).Error("query.QueryBackfill failed")
|
util.GetLogger(httpReq.Context()).WithError(err).Error("query.PerformBackfill failed")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -211,10 +211,10 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query a given amount (or less) of events prior to a given set of events.
|
// Query a given amount (or less) of events prior to a given set of events.
|
||||||
func (t *testRoomserverAPI) QueryBackfill(
|
func (t *testRoomserverAPI) PerformBackfill(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryBackfillRequest,
|
request *api.PerformBackfillRequest,
|
||||||
response *api.QueryBackfillResponse,
|
response *api.PerformBackfillResponse,
|
||||||
) error {
|
) error {
|
||||||
return fmt.Errorf("not implemented")
|
return fmt.Errorf("not implemented")
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,11 +131,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// processMessage updates the list of currently joined hosts in the room
|
// processMessage updates the list of currently joined hosts in the room
|
||||||
// and then sends the event to the hosts that were joined before the event.
|
// and then sends the event to the hosts that were joined before the event.
|
||||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||||
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event.Event)
|
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(ore.AddsState()))
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
addsJoinedHosts, err := joinedHostsFromEvents(addsStateEvents)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,20 +78,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ev := output.NewRoomEvent.Event
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"event_id": ev.EventID(),
|
|
||||||
"room_id": ev.RoomID(),
|
|
||||||
"type": ev.Type(),
|
|
||||||
}).Info("received event from roomserver")
|
|
||||||
|
|
||||||
addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs}
|
|
||||||
var addQueryRes api.QueryEventsByIDResponse
|
|
||||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil {
|
|
||||||
log.Warn(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
|
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
|
||||||
var remQueryRes api.QueryEventsByIDResponse
|
var remQueryRes api.QueryEventsByIDResponse
|
||||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
|
if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
|
||||||
|
@ -100,9 +86,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var addQueryEvents, remQueryEvents []gomatrixserverlib.Event
|
var addQueryEvents, remQueryEvents []gomatrixserverlib.Event
|
||||||
for _, headeredEvent := range addQueryRes.Events {
|
for _, headeredEvent := range output.NewRoomEvent.AddsState() {
|
||||||
addQueryEvents = append(addQueryEvents, headeredEvent.Event)
|
addQueryEvents = append(addQueryEvents, headeredEvent.Event)
|
||||||
}
|
}
|
||||||
|
addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap())
|
||||||
for _, headeredEvent := range remQueryRes.Events {
|
for _, headeredEvent := range remQueryRes.Events {
|
||||||
remQueryEvents = append(remQueryEvents, headeredEvent.Event)
|
remQueryEvents = append(remQueryEvents, headeredEvent.Event)
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,10 +89,10 @@ type RoomserverInternalAPI interface {
|
||||||
) error
|
) error
|
||||||
|
|
||||||
// Query a given amount (or less) of events prior to a given set of events.
|
// Query a given amount (or less) of events prior to a given set of events.
|
||||||
QueryBackfill(
|
PerformBackfill(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *QueryBackfillRequest,
|
request *PerformBackfillRequest,
|
||||||
response *QueryBackfillResponse,
|
response *PerformBackfillResponse,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
// Asks for the default room version as preferred by the server.
|
// Asks for the default room version as preferred by the server.
|
||||||
|
|
|
@ -63,6 +63,13 @@ type OutputNewRoomEvent struct {
|
||||||
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
// Together with RemovesStateEventIDs this allows the receiver to keep an up to date
|
||||||
// view of the current state of the room.
|
// view of the current state of the room.
|
||||||
AddsStateEventIDs []string `json:"adds_state_event_ids"`
|
AddsStateEventIDs []string `json:"adds_state_event_ids"`
|
||||||
|
// All extra newly added state events. This is only set if there are *extra* events
|
||||||
|
// other than `Event`. This can happen when forks get merged because state resolution
|
||||||
|
// may decide a bunch of state events on one branch are now valid, so they will be
|
||||||
|
// present in this list. This is useful when trying to maintain the current state of a room
|
||||||
|
// as to do so you need to include both these events and `Event`.
|
||||||
|
AddStateEvents []gomatrixserverlib.HeaderedEvent `json:"adds_state_events"`
|
||||||
|
|
||||||
// The state event IDs that were removed from the state of the room by this event.
|
// The state event IDs that were removed from the state of the room by this event.
|
||||||
RemovesStateEventIDs []string `json:"removes_state_event_ids"`
|
RemovesStateEventIDs []string `json:"removes_state_event_ids"`
|
||||||
// The ID of the event that was output before this event.
|
// The ID of the event that was output before this event.
|
||||||
|
@ -112,6 +119,26 @@ type OutputNewRoomEvent struct {
|
||||||
TransactionID *TransactionID `json:"transaction_id"`
|
TransactionID *TransactionID `json:"transaction_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddsState returns all added state events from this event.
|
||||||
|
//
|
||||||
|
// This function is needed because `AddStateEvents` will not include a copy of
|
||||||
|
// the original event to save space, so you cannot use that slice alone.
|
||||||
|
// Instead, use this function which will add the original event if it is present
|
||||||
|
// in `AddsStateEventIDs`.
|
||||||
|
func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent {
|
||||||
|
includeOutputEvent := false
|
||||||
|
for _, id := range ore.AddsStateEventIDs {
|
||||||
|
if id == ore.Event.EventID() {
|
||||||
|
includeOutputEvent = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !includeOutputEvent {
|
||||||
|
return ore.AddStateEvents
|
||||||
|
}
|
||||||
|
return append(ore.AddStateEvents, ore.Event)
|
||||||
|
}
|
||||||
|
|
||||||
// An OutputNewInviteEvent is written whenever an invite becomes active.
|
// An OutputNewInviteEvent is written whenever an invite becomes active.
|
||||||
// Invite events can be received outside of an existing room so have to be
|
// Invite events can be received outside of an existing room so have to be
|
||||||
// tracked separately from the room events themselves.
|
// tracked separately from the room events themselves.
|
||||||
|
|
|
@ -2,6 +2,7 @@ package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PerformJoinRequest struct {
|
type PerformJoinRequest struct {
|
||||||
|
@ -22,3 +23,31 @@ type PerformLeaveRequest struct {
|
||||||
|
|
||||||
type PerformLeaveResponse struct {
|
type PerformLeaveResponse struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PerformBackfillRequest is a request to PerformBackfill.
|
||||||
|
type PerformBackfillRequest struct {
|
||||||
|
// The room to backfill
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
// A map of backwards extremity event ID to a list of its prev_event IDs.
|
||||||
|
BackwardsExtremities map[string][]string `json:"backwards_extremities"`
|
||||||
|
// The maximum number of events to retrieve.
|
||||||
|
Limit int `json:"limit"`
|
||||||
|
// The server interested in the events.
|
||||||
|
ServerName gomatrixserverlib.ServerName `json:"server_name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order.
|
||||||
|
func (r *PerformBackfillRequest) PrevEventIDs() []string {
|
||||||
|
var prevEventIDs []string
|
||||||
|
for _, pes := range r.BackwardsExtremities {
|
||||||
|
prevEventIDs = append(prevEventIDs, pes...)
|
||||||
|
}
|
||||||
|
prevEventIDs = util.UniqueStrings(prevEventIDs)
|
||||||
|
return prevEventIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
// PerformBackfillResponse is a response to PerformBackfill.
|
||||||
|
type PerformBackfillResponse struct {
|
||||||
|
// Missing events, arbritrary order.
|
||||||
|
Events []gomatrixserverlib.HeaderedEvent `json:"events"`
|
||||||
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@ package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState
|
// QueryLatestEventsAndStateRequest is a request to QueryLatestEventsAndState
|
||||||
|
@ -204,34 +203,6 @@ type QueryStateAndAuthChainResponse struct {
|
||||||
AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
|
AuthChainEvents []gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryBackfillRequest is a request to QueryBackfill.
|
|
||||||
type QueryBackfillRequest struct {
|
|
||||||
// The room to backfill
|
|
||||||
RoomID string `json:"room_id"`
|
|
||||||
// A map of backwards extremity event ID to a list of its prev_event IDs.
|
|
||||||
BackwardsExtremities map[string][]string `json:"backwards_extremities"`
|
|
||||||
// The maximum number of events to retrieve.
|
|
||||||
Limit int `json:"limit"`
|
|
||||||
// The server interested in the events.
|
|
||||||
ServerName gomatrixserverlib.ServerName `json:"server_name"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order.
|
|
||||||
func (r *QueryBackfillRequest) PrevEventIDs() []string {
|
|
||||||
var prevEventIDs []string
|
|
||||||
for _, pes := range r.BackwardsExtremities {
|
|
||||||
prevEventIDs = append(prevEventIDs, pes...)
|
|
||||||
}
|
|
||||||
prevEventIDs = util.UniqueStrings(prevEventIDs)
|
|
||||||
return prevEventIDs
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryBackfillResponse is a response to QueryBackfill.
|
|
||||||
type QueryBackfillResponse struct {
|
|
||||||
// Missing events, arbritrary order.
|
|
||||||
Events []gomatrixserverlib.HeaderedEvent `json:"events"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueryRoomVersionCapabilitiesRequest asks for the default room version
|
// QueryRoomVersionCapabilitiesRequest asks for the default room version
|
||||||
type QueryRoomVersionCapabilitiesRequest struct{}
|
type QueryRoomVersionCapabilitiesRequest struct{}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package internal
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -310,24 +311,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
TransactionID: u.transactionID,
|
TransactionID: u.transactionID,
|
||||||
}
|
}
|
||||||
|
|
||||||
var stateEventNIDs []types.EventNID
|
eventIDMap, err := u.stateEventMap()
|
||||||
for _, entry := range u.added {
|
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
|
||||||
}
|
|
||||||
for _, entry := range u.removed {
|
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
|
||||||
}
|
|
||||||
for _, entry := range u.stateBeforeEventRemoves {
|
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
|
||||||
}
|
|
||||||
for _, entry := range u.stateBeforeEventAdds {
|
|
||||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
|
||||||
}
|
|
||||||
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
|
|
||||||
eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, entry := range u.added {
|
for _, entry := range u.added {
|
||||||
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
|
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
|
||||||
}
|
}
|
||||||
|
@ -342,12 +330,60 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
||||||
}
|
}
|
||||||
ore.SendAsServer = u.sendAsServer
|
ore.SendAsServer = u.sendAsServer
|
||||||
|
|
||||||
|
// include extra state events if they were added as nearly every downstream component will care about it
|
||||||
|
// and we'd rather not have them all hit QueryEventsByID at the same time!
|
||||||
|
if len(ore.AddsStateEventIDs) > 0 {
|
||||||
|
ore.AddStateEvents, err = u.extraEventsForIDs(roomVersion, ore.AddsStateEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &api.OutputEvent{
|
return &api.OutputEvent{
|
||||||
Type: api.OutputTypeNewRoomEvent,
|
Type: api.OutputTypeNewRoomEvent,
|
||||||
NewRoomEvent: &ore,
|
NewRoomEvent: &ore,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extraEventsForIDs returns the full events for the event IDs given, but does not include the current event being
|
||||||
|
// updated.
|
||||||
|
func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||||
|
var extraEventIDs []string
|
||||||
|
for _, e := range eventIDs {
|
||||||
|
if e == u.event.EventID() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
extraEventIDs = append(extraEventIDs, e)
|
||||||
|
}
|
||||||
|
if len(extraEventIDs) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var h []gomatrixserverlib.HeaderedEvent
|
||||||
|
for _, e := range extraEvents {
|
||||||
|
h = append(h, e.Headered(roomVersion))
|
||||||
|
}
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// retrieve an event nid -> event ID map for all events that need updating
|
||||||
|
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
|
||||||
|
var stateEventNIDs []types.EventNID
|
||||||
|
var allStateEntries []types.StateEntry
|
||||||
|
allStateEntries = append(allStateEntries, u.added...)
|
||||||
|
allStateEntries = append(allStateEntries, u.removed...)
|
||||||
|
allStateEntries = append(allStateEntries, u.stateBeforeEventRemoves...)
|
||||||
|
allStateEntries = append(allStateEntries, u.stateBeforeEventAdds...)
|
||||||
|
for _, entry := range allStateEntries {
|
||||||
|
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||||
|
}
|
||||||
|
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
|
||||||
|
return u.api.DB.EventIDs(u.ctx, stateEventNIDs)
|
||||||
|
}
|
||||||
|
|
||||||
type eventNIDSorter []types.EventNID
|
type eventNIDSorter []types.EventNID
|
||||||
|
|
||||||
func (s eventNIDSorter) Len() int { return len(s) }
|
func (s eventNIDSorter) Len() int { return len(s) }
|
||||||
|
|
|
@ -441,11 +441,11 @@ func (r *RoomserverInternalAPI) QueryMissingEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryBackfill implements api.RoomServerQueryAPI
|
// PerformBackfill implements api.RoomServerQueryAPI
|
||||||
func (r *RoomserverInternalAPI) QueryBackfill(
|
func (r *RoomserverInternalAPI) PerformBackfill(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryBackfillRequest,
|
request *api.PerformBackfillRequest,
|
||||||
response *api.QueryBackfillResponse,
|
response *api.PerformBackfillResponse,
|
||||||
) error {
|
) error {
|
||||||
// if we are requesting the backfill then we need to do a federation hit
|
// if we are requesting the backfill then we need to do a federation hit
|
||||||
// TODO: we could be more sensible and fetch as many events we already have then request the rest
|
// TODO: we could be more sensible and fetch as many events we already have then request the rest
|
||||||
|
@ -489,7 +489,7 @@ func (r *RoomserverInternalAPI) QueryBackfill(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.QueryBackfillRequest, res *api.QueryBackfillResponse) error {
|
func (r *RoomserverInternalAPI) backfillViaFederation(ctx context.Context, req *api.PerformBackfillRequest, res *api.PerformBackfillResponse) error {
|
||||||
roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
|
roomVer, err := r.DB.GetRoomVersionForRoom(ctx, req.RoomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
|
return fmt.Errorf("backfillViaFederation: unknown room version for room %s : %w", req.RoomID, err)
|
||||||
|
@ -647,7 +647,7 @@ func (r *RoomserverInternalAPI) scanEventTree(
|
||||||
var pre string
|
var pre string
|
||||||
|
|
||||||
// TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be)
|
// TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be)
|
||||||
// Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing
|
// Currently, callers like PerformBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing
|
||||||
// so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in
|
// so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in
|
||||||
// duplicate events being sent in response to /backfill requests.
|
// duplicate events being sent in response to /backfill requests.
|
||||||
initialIgnoreList := make(map[string]bool, len(visited))
|
initialIgnoreList := make(map[string]bool, len(visited))
|
||||||
|
|
|
@ -26,6 +26,7 @@ const (
|
||||||
// Perform operations
|
// Perform operations
|
||||||
RoomserverPerformJoinPath = "/roomserver/performJoin"
|
RoomserverPerformJoinPath = "/roomserver/performJoin"
|
||||||
RoomserverPerformLeavePath = "/roomserver/performLeave"
|
RoomserverPerformLeavePath = "/roomserver/performLeave"
|
||||||
|
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
|
||||||
|
|
||||||
// Query operations
|
// Query operations
|
||||||
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
|
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
|
||||||
|
@ -36,7 +37,6 @@ const (
|
||||||
RoomserverQueryServerAllowedToSeeEventPath = "/roomserver/queryServerAllowedToSeeEvent"
|
RoomserverQueryServerAllowedToSeeEventPath = "/roomserver/queryServerAllowedToSeeEvent"
|
||||||
RoomserverQueryMissingEventsPath = "/roomserver/queryMissingEvents"
|
RoomserverQueryMissingEventsPath = "/roomserver/queryMissingEvents"
|
||||||
RoomserverQueryStateAndAuthChainPath = "/roomserver/queryStateAndAuthChain"
|
RoomserverQueryStateAndAuthChainPath = "/roomserver/queryStateAndAuthChain"
|
||||||
RoomserverQueryBackfillPath = "/roomserver/queryBackfill"
|
|
||||||
RoomserverQueryRoomVersionCapabilitiesPath = "/roomserver/queryRoomVersionCapabilities"
|
RoomserverQueryRoomVersionCapabilitiesPath = "/roomserver/queryRoomVersionCapabilities"
|
||||||
RoomserverQueryRoomVersionForRoomPath = "/roomserver/queryRoomVersionForRoom"
|
RoomserverQueryRoomVersionForRoomPath = "/roomserver/queryRoomVersionForRoom"
|
||||||
)
|
)
|
||||||
|
@ -274,16 +274,16 @@ func (h *httpRoomserverInternalAPI) QueryStateAndAuthChain(
|
||||||
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryBackfill implements RoomServerQueryAPI
|
// PerformBackfill implements RoomServerQueryAPI
|
||||||
func (h *httpRoomserverInternalAPI) QueryBackfill(
|
func (h *httpRoomserverInternalAPI) PerformBackfill(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryBackfillRequest,
|
request *api.PerformBackfillRequest,
|
||||||
response *api.QueryBackfillResponse,
|
response *api.PerformBackfillResponse,
|
||||||
) error {
|
) error {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformBackfill")
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
apiURL := h.roomserverURL + RoomserverQueryBackfillPath
|
apiURL := h.roomserverURL + RoomserverPerformBackfillPath
|
||||||
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return internalHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -165,14 +165,14 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
internalAPIMux.Handle(
|
internalAPIMux.Handle(
|
||||||
RoomserverQueryBackfillPath,
|
RoomserverPerformBackfillPath,
|
||||||
internal.MakeInternalAPI("QueryBackfill", func(req *http.Request) util.JSONResponse {
|
internal.MakeInternalAPI("PerformBackfill", func(req *http.Request) util.JSONResponse {
|
||||||
var request api.QueryBackfillRequest
|
var request api.PerformBackfillRequest
|
||||||
var response api.QueryBackfillResponse
|
var response api.PerformBackfillResponse
|
||||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
if err := r.QueryBackfill(req.Context(), &request, &response); err != nil {
|
if err := r.PerformBackfill(req.Context(), &request, &response); err != nil {
|
||||||
return util.ErrorResponse(err)
|
return util.ErrorResponse(err)
|
||||||
}
|
}
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
|
|
@ -17,7 +17,6 @@ package consumers
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
@ -105,17 +104,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
"room_version": ev.RoomVersion,
|
"room_version": ev.RoomVersion,
|
||||||
}).Info("received event from roomserver")
|
}).Info("received event from roomserver")
|
||||||
|
|
||||||
addsStateEvents, err := s.lookupStateEvents(msg.AddsStateEventIDs, ev)
|
addsStateEvents := msg.AddsState()
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
"event": string(ev.JSON()),
|
|
||||||
log.ErrorKey: err,
|
|
||||||
"add": msg.AddsStateEventIDs,
|
|
||||||
"del": msg.RemovesStateEventIDs,
|
|
||||||
}).Panicf("roomserver output log: state event lookup failure")
|
|
||||||
}
|
|
||||||
|
|
||||||
ev, err = s.updateStateEvent(ev)
|
ev, err := s.updateStateEvent(ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -185,63 +176,6 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookupStateEvents looks up the state events that are added by a new event.
|
|
||||||
func (s *OutputRoomEventConsumer) lookupStateEvents(
|
|
||||||
addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
|
|
||||||
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
|
||||||
// Fast path if there aren't any new state events.
|
|
||||||
if len(addsStateEventIDs) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast path if the only state event added is the event itself.
|
|
||||||
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
|
||||||
return []gomatrixserverlib.HeaderedEvent{event}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if this is re-adding a state events that we previously processed
|
|
||||||
// If we have previously received a state event it may still be in
|
|
||||||
// our event database.
|
|
||||||
result, err := s.db.Events(context.TODO(), addsStateEventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
missing := missingEventsFrom(result, addsStateEventIDs)
|
|
||||||
|
|
||||||
// Check if event itself is being added.
|
|
||||||
for _, eventID := range missing {
|
|
||||||
if eventID == event.EventID() {
|
|
||||||
result = append(result, event)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
missing = missingEventsFrom(result, addsStateEventIDs)
|
|
||||||
|
|
||||||
if len(missing) == 0 {
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point the missing events are neither the event itself nor are
|
|
||||||
// they present in our local database. Our only option is to fetch them
|
|
||||||
// from the roomserver using the query API.
|
|
||||||
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
|
|
||||||
var eventResp api.QueryEventsByIDResponse
|
|
||||||
if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
result = append(result, eventResp.Events...)
|
|
||||||
missing = missingEventsFrom(result, addsStateEventIDs)
|
|
||||||
|
|
||||||
if len(missing) != 0 {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"missing %d state events IDs at event %q", len(missing), event.EventID(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
|
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
|
||||||
var stateKey string
|
var stateKey string
|
||||||
if event.StateKey() == nil {
|
if event.StateKey() == nil {
|
||||||
|
@ -270,17 +204,3 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Heade
|
||||||
event.Event, err = event.SetUnsigned(prev)
|
event.Event, err = event.SetUnsigned(prev)
|
||||||
return event, err
|
return event, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func missingEventsFrom(events []gomatrixserverlib.HeaderedEvent, required []string) []string {
|
|
||||||
have := map[string]bool{}
|
|
||||||
for _, event := range events {
|
|
||||||
have[event.EventID()] = true
|
|
||||||
}
|
|
||||||
var missing []string
|
|
||||||
for _, eventID := range required {
|
|
||||||
if !have[eventID] {
|
|
||||||
missing = append(missing, eventID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return missing
|
|
||||||
}
|
|
||||||
|
|
|
@ -375,15 +375,15 @@ func (e eventsByDepth) Less(i, j int) bool {
|
||||||
// Returns an error if there was an issue with retrieving the list of servers in
|
// Returns an error if there was an issue with retrieving the list of servers in
|
||||||
// the room or sending the request.
|
// the room or sending the request.
|
||||||
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
|
func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||||
var res api.QueryBackfillResponse
|
var res api.PerformBackfillResponse
|
||||||
err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
|
err := r.rsAPI.PerformBackfill(context.Background(), &api.PerformBackfillRequest{
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
BackwardsExtremities: backwardsExtremities,
|
BackwardsExtremities: backwardsExtremities,
|
||||||
Limit: limit,
|
Limit: limit,
|
||||||
ServerName: r.cfg.Matrix.ServerName,
|
ServerName: r.cfg.Matrix.ServerName,
|
||||||
}, &res)
|
}, &res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("QueryBackfill failed: %w", err)
|
return nil, fmt.Errorf("PerformBackfill failed: %w", err)
|
||||||
}
|
}
|
||||||
util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")
|
util.GetLogger(r.ctx).WithField("new_events", len(res.Events)).Info("Storing new events from backfill")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue