From 31d3b0d4a52aa52b05f85069ae3f08f9fc8b64ab Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 5 May 2020 15:48:37 +0100 Subject: [PATCH] Prefer /state_ids when missing state across federation (#1008) * Prefer /state_ids when missing state across federation * Linting * Better logging --- clientapi/producers/roomserver.go | 2 +- federationapi/routing/send.go | 134 +++++++++++++++++++++++++++--- roomserver/api/query.go | 2 +- 3 files changed, 126 insertions(+), 12 deletions(-) diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go index a804abfe..7eee83f5 100644 --- a/clientapi/producers/roomserver.go +++ b/clientapi/producers/roomserver.go @@ -54,7 +54,7 @@ func (c *RoomserverProducer) SendEvents( // SendEventWithState writes an event with KindNew to the roomserver input log // with the state at the event as KindOutlier before it. func (c *RoomserverProducer) SendEventWithState( - ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, + ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, ) error { outliers, err := state.Events() if err != nil { diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 88411b81..a6072c66 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) // Send implements /_matrix/federation/v1/send/{txnID} @@ -297,22 +298,27 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer // However not all version of synapse support /state_ids so you may // need to fallback to /state. // TODO: Attempt to fill in the gap using /get_missing_events - // TODO: Attempt to fetch the state using /state_ids and /events - state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion) + + // Attempt to fetch the missing state using /state_ids and /events + var respState *gomatrixserverlib.RespState + var err error + respState, err = t.lookupMissingStateViaStateIDs(e, roomVersion) if err != nil { - return err - } - // Check that the returned state is valid. - if err := state.Check(t.context, t.keys); err != nil { - return err + // Fallback to /state + util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state") + respState, err = t.lookupMissingStateViaState(e, roomVersion) + if err != nil { + return err + } } + // Check that the event is allowed by the state. retryAllowedState: - if err := checkAllowedByState(e, state.StateEvents); err != nil { + if err := checkAllowedByState(e, respState.StateEvents); err != nil { switch missing := err.(type) { case gomatrixserverlib.MissingAuthEventError: // An auth event was missing so let's look up that event over federation - for _, s := range state.StateEvents { + for _, s := range respState.StateEvents { if s.EventID() != missing.AuthEventID { continue } @@ -329,5 +335,113 @@ retryAllowedState: } // pass the event along with the state to the roomserver - return t.producer.SendEventWithState(t.context, state, e.Headered(roomVersion)) + return t.producer.SendEventWithState(t.context, respState, e.Headered(roomVersion)) +} + +func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( + respState *gomatrixserverlib.RespState, err error) { + state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion) + if err != nil { + return nil, err + } + // Check that the returned state is valid. + if err := state.Check(t.context, t.keys); err != nil { + return nil, err + } + return &state, nil +} + +func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( + *gomatrixserverlib.RespState, error) { + + // fetch all the state events we do know about: the current state + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: e.RoomID(), + StateToFetch: nil, // fetch all state + } + var queryRes api.QueryLatestEventsAndStateResponse + if err := t.rsAPI.QueryLatestEventsAndState(t.context, &queryReq, &queryRes); err != nil { + return nil, err + } + if !queryRes.RoomExists { + return nil, fmt.Errorf("room %s doesn't exist", e.RoomID()) + } + // allow indexing of current state by event ID + haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(queryRes.StateEvents)) + for i := range queryRes.StateEvents { + haveEventMap[queryRes.StateEvents[i].EventID()] = &queryRes.StateEvents[i] + } + + // fetch the state event IDs at the time of the event + stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID()) + if err != nil { + return nil, err + } + // work out which auth/state IDs are missing + wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...) + missing := make(map[string]bool) + for _, sid := range wantIDs { + if _, ok := haveEventMap[sid]; !ok { + missing[sid] = true + } + } + util.GetLogger(t.context).WithFields(logrus.Fields{ + "missing": len(missing), + "event_id": e.EventID(), + "room_id": e.RoomID(), + "already_have": len(queryRes.StateEvents), + "total_state": len(stateIDs.StateEventIDs), + "total_auth_events": len(stateIDs.AuthEventIDs), + }).Info("Fetching missing state at event") + + for missingEventID := range missing { + txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID) + if err != nil { + util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID") + return nil, err + } + for _, pdu := range txn.PDUs { + event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) + if err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID()) + return nil, unmarshalError{err} + } + if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) + return nil, verifySigError{event.EventID(), err} + } + h := event.Headered(roomVersion) + haveEventMap[event.EventID()] = &h + } + } + return t.createRespStateFromStateIDs(stateIDs, haveEventMap) +} + +func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) ( + *gomatrixserverlib.RespState, error) { + // create a RespState response using the response to /state_ids as a guide + respState := gomatrixserverlib.RespState{ + AuthEvents: make([]gomatrixserverlib.Event, len(stateIDs.AuthEventIDs)), + StateEvents: make([]gomatrixserverlib.Event, len(stateIDs.StateEventIDs)), + } + + for i := range stateIDs.StateEventIDs { + ev, ok := haveEventMap[stateIDs.StateEventIDs[i]] + if !ok { + return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i]) + } + respState.StateEvents[i] = ev.Unwrap() + } + for i := range stateIDs.AuthEventIDs { + ev, ok := haveEventMap[stateIDs.AuthEventIDs[i]] + if !ok { + return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i]) + } + respState.AuthEvents[i] = ev.Unwrap() + } + // Check that the returned state is valid. + if err := respState.Check(t.context, t.keys); err != nil { + return nil, err + } + return &respState, nil } diff --git a/roomserver/api/query.go b/roomserver/api/query.go index cb7cbb86..9afc51f4 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -29,7 +29,7 @@ type QueryLatestEventsAndStateRequest struct { // The room ID to query the latest events for. RoomID string `json:"room_id"` // The state key tuples to fetch from the room current state. - // If this list is empty or nil then no state events are returned. + // If this list is empty or nil then *ALL* current state events are returned. StateToFetch []gomatrixserverlib.StateKeyTuple `json:"state_to_fetch"` }