diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 99022074..e62855ab 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -74,28 +74,17 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q containing %d PDUs, %d EDUs", txnID, len(t.PDUs), len(t.EDUs)) resp, err := t.processTransaction() - switch err.(type) { - // No error? Great! Send back a 200. - case nil: - return util.JSONResponse{ - Code: http.StatusOK, - JSON: resp, - } - // Handle known error cases as we will return a 400 error for these. - case roomNotFoundError: - case unmarshalError: - case verifySigError: - // Handle unknown error cases. Sending 500 errors back should be a last - // resort as this can make other homeservers back off sending federation - // events. - default: + if err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed") - return jsonerror.InternalServerError() } - // Return a 400 error for bad requests as fallen through from above. + + // https://matrix.org/docs/spec/server_server/r0.1.3#put-matrix-federation-v1-send-txnid + // Status code 200: + // The result of processing the transaction. The server is to use this response + // even in the event of one or more PDUs failing to be processed. return util.JSONResponse{ - Code: http.StatusBadRequest, - JSON: jsonerror.BadJSON(err.Error()), + Code: http.StatusOK, + JSON: resp, } } @@ -135,30 +124,39 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { } if err := json.Unmarshal(pdu, &header); err != nil { util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event") - return nil, unmarshalError{err} + // We don't know the event ID at this point so we can't return the + // failure in the PDU results + continue } verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID} verRes := api.QueryRoomVersionForRoomResponse{} if err := t.rsAPI.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil { util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID) - return nil, roomNotFoundError{verReq.RoomID} + // We don't know the event ID at this point so we can't return the + // failure in the PDU results + continue } event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion) if err != nil { util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID()) - return nil, unmarshalError{err} + results[event.EventID()] = gomatrixserverlib.PDUResult{ + Error: err.Error(), + } + continue } - if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { + if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) - return nil, verifySigError{event.EventID(), err} + results[event.EventID()] = gomatrixserverlib.PDUResult{ + Error: err.Error(), + } + continue } pdus = append(pdus, event.Headered(verRes.RoomVersion)) } // Process the events. for _, e := range pdus { - err := t.processEvent(e.Unwrap(), true) - if err != nil { + if err := t.processEvent(e.Unwrap(), true); err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the // sender knows that we have skipped processing it. @@ -174,20 +172,17 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { // receive another event referencing it. // If we bail and stop processing then we risk wedging incoming // transactions from that server forever. - switch err.(type) { - case roomNotFoundError: - case *gomatrixserverlib.NotAllowed: - case missingPrevEventsError: - default: - util.GetLogger(t.context).Warnf("Processing %s failed: %s", e.EventID(), err) + if isProcessingErrorFatal(err) { // Any other error should be the result of a temporary error in // our server so we should bail processing the transaction entirely. + util.GetLogger(t.context).Warnf("Processing %s failed fatally: %s", e.EventID(), err) return nil, err + } else { + util.GetLogger(t.context).WithError(err).WithField("event_id", e.EventID()).Warn("Failed to process incoming federation event, skipping") + results[e.EventID()] = gomatrixserverlib.PDUResult{ + Error: err.Error(), + } } - results[e.EventID()] = gomatrixserverlib.PDUResult{ - Error: err.Error(), - } - util.GetLogger(t.context).WithError(err).WithField("event_id", e.EventID()).Warn("Failed to process incoming federation event, skipping it.") } else { results[e.EventID()] = gomatrixserverlib.PDUResult{} } @@ -198,6 +193,25 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { return &gomatrixserverlib.RespSend{PDUs: results}, nil } +// isProcessingErrorFatal returns true if the error is really bad and +// we should stop processing the transaction, and returns false if it +// is just some less serious error about a specific event. +func isProcessingErrorFatal(err error) bool { + switch err.(type) { + case roomNotFoundError: + case *gomatrixserverlib.NotAllowed: + case missingPrevEventsError: + default: + switch err { + case context.Canceled: + case context.DeadlineExceeded: + default: + return true + } + } + return false +} + type roomNotFoundError struct { roomID string }