Add PerformInvite and refactor how errors get handled (#1158)

* Add PerformInvite and refactor how errors get handled

- Rename `JoinError` to `PerformError`
- Remove `error` from the API function signature entirely. This forces
  errors to be bundled into `PerformError` which makes it easier for callers
  to detect and handle errors. On network errors, HTTP clients will make a
  `PerformError`.

* Unbreak everything; thanks Go!

* Send back JSONResponse according to the PerformError

* Update federation invite code too
main
Kegsay 2020-06-24 15:06:14 +01:00 committed by GitHub
parent ebaaf65c54
commit 002fe05a20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 469 additions and 332 deletions

View File

@ -403,15 +403,15 @@ func createRoom(
} }
} }
// Send the invite event to the roomserver. // Send the invite event to the roomserver.
if err = roomserverAPI.SendInvite( if perr := roomserverAPI.SendInvite(
req.Context(), rsAPI, req.Context(), rsAPI,
inviteEvent.Headered(roomVersion), inviteEvent.Headered(roomVersion),
strippedState, // invite room state strippedState, // invite room state
cfg.Matrix.ServerName, // send as server cfg.Matrix.ServerName, // send as server
nil, // transaction ID nil, // transaction ID
); err != nil { ); perr != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed") util.GetLogger(req.Context()).WithError(perr).Error("SendInvite failed")
return jsonerror.InternalServerError() return perr.JSONResponse()
} }
} }

View File

@ -15,12 +15,10 @@
package routing package routing
import ( import (
"errors"
"net/http" "net/http"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/accounts"
@ -65,32 +63,9 @@ func JoinRoomByIDOrAlias(
} }
// Ask the roomserver to perform the join. // Ask the roomserver to perform the join.
err = rsAPI.PerformJoin(req.Context(), &joinReq, &joinRes) rsAPI.PerformJoin(req.Context(), &joinReq, &joinRes)
// Handle known errors first, if this is 0 then there will be no matches (eg on success) if joinRes.Error != nil {
switch joinRes.Error { return joinRes.Error.JSONResponse()
case roomserverAPI.JoinErrorBadRequest:
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown(joinRes.ErrMsg),
}
case roomserverAPI.JoinErrorNoRoom:
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound(joinRes.ErrMsg),
}
case roomserverAPI.JoinErrorNotAllowed:
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden(joinRes.ErrMsg),
}
}
// this is always populated on generic errors
if joinRes.ErrMsg != "" {
return util.ErrorResponse(errors.New(joinRes.ErrMsg))
}
// this is set on network errors in polylith mode
if err != nil {
return util.ErrorResponse(err)
} }
return util.JSONResponse{ return util.JSONResponse{

View File

@ -111,16 +111,16 @@ func SendMembership(
switch membership { switch membership {
case gomatrixserverlib.Invite: case gomatrixserverlib.Invite:
// Invites need to be handled specially // Invites need to be handled specially
err = roomserverAPI.SendInvite( perr := roomserverAPI.SendInvite(
req.Context(), rsAPI, req.Context(), rsAPI,
event.Headered(verRes.RoomVersion), event.Headered(verRes.RoomVersion),
nil, // ask the roomserver to draw up invite room state for us nil, // ask the roomserver to draw up invite room state for us
cfg.Matrix.ServerName, cfg.Matrix.ServerName,
nil, nil,
) )
if err != nil { if perr != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendInvite failed") util.GetLogger(req.Context()).WithError(perr).Error("producer.SendInvite failed")
return jsonerror.InternalServerError() return perr.JSONResponse()
} }
case gomatrixserverlib.Join: case gomatrixserverlib.Join:
// The join membership requires the room id to be sent in the response // The join membership requires the room id to be sent in the response

View File

@ -98,15 +98,15 @@ func Invite(
) )
// Add the invite event to the roomserver. // Add the invite event to the roomserver.
if err = api.SendInvite( if perr := api.SendInvite(
httpReq.Context(), rsAPI, httpReq.Context(), rsAPI,
signedEvent.Headered(inviteReq.RoomVersion()), signedEvent.Headered(inviteReq.RoomVersion()),
inviteReq.InviteRoomState(), inviteReq.InviteRoomState(),
event.Origin(), event.Origin(),
nil, nil,
); err != nil { ); perr != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed") util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed")
return jsonerror.InternalServerError() return perr.JSONResponse()
} }
// Return the signed event to the originating server, it should then tell // Return the signed event to the originating server, it should then tell

View File

@ -97,12 +97,18 @@ func (t *testRoomserverAPI) InputRoomEvents(
return nil return nil
} }
func (t *testRoomserverAPI) PerformInvite(
ctx context.Context,
req *api.PerformInviteRequest,
res *api.PerformInviteResponse,
) {
}
func (t *testRoomserverAPI) PerformJoin( func (t *testRoomserverAPI) PerformJoin(
ctx context.Context, ctx context.Context,
req *api.PerformJoinRequest, req *api.PerformJoinRequest,
res *api.PerformJoinResponse, res *api.PerformJoinResponse,
) error { ) {
return nil
} }
func (t *testRoomserverAPI) PerformLeave( func (t *testRoomserverAPI) PerformLeave(

View File

@ -18,11 +18,17 @@ type RoomserverInternalAPI interface {
response *InputRoomEventsResponse, response *InputRoomEventsResponse,
) error ) error
PerformInvite(
ctx context.Context,
req *PerformInviteRequest,
res *PerformInviteResponse,
)
PerformJoin( PerformJoin(
ctx context.Context, ctx context.Context,
req *PerformJoinRequest, req *PerformJoinRequest,
res *PerformJoinResponse, res *PerformJoinResponse,
) error )
PerformLeave( PerformLeave(
ctx context.Context, ctx context.Context,

View File

@ -29,14 +29,22 @@ func (t *RoomserverInternalAPITrace) InputRoomEvents(
return err return err
} }
func (t *RoomserverInternalAPITrace) PerformInvite(
ctx context.Context,
req *PerformInviteRequest,
res *PerformInviteResponse,
) {
t.Impl.PerformInvite(ctx, req, res)
util.GetLogger(ctx).Infof("PerformInvite req=%+v res=%+v", js(req), js(res))
}
func (t *RoomserverInternalAPITrace) PerformJoin( func (t *RoomserverInternalAPITrace) PerformJoin(
ctx context.Context, ctx context.Context,
req *PerformJoinRequest, req *PerformJoinRequest,
res *PerformJoinResponse, res *PerformJoinResponse,
) error { ) {
err := t.Impl.PerformJoin(ctx, req, res) t.Impl.PerformJoin(ctx, req, res)
util.GetLogger(ctx).WithError(err).Infof("PerformJoin req=%+v res=%+v", js(req), js(res)) util.GetLogger(ctx).Infof("PerformJoin req=%+v res=%+v", js(req), js(res))
return err
} }
func (t *RoomserverInternalAPITrace) PerformLeave( func (t *RoomserverInternalAPITrace) PerformLeave(

View File

@ -76,21 +76,9 @@ type TransactionID struct {
TransactionID string `json:"id"` TransactionID string `json:"id"`
} }
// InputInviteEvent is a matrix invite event received over federation without
// the usual context a matrix room event would have. We usually do not have
// access to the events needed to check the event auth rules for the invite.
type InputInviteEvent struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
SendAsServer string `json:"send_as_server"`
TransactionID *TransactionID `json:"transaction_id"`
}
// InputRoomEventsRequest is a request to InputRoomEvents // InputRoomEventsRequest is a request to InputRoomEvents
type InputRoomEventsRequest struct { type InputRoomEventsRequest struct {
InputRoomEvents []InputRoomEvent `json:"input_room_events"` InputRoomEvents []InputRoomEvent `json:"input_room_events"`
InputInviteEvents []InputInviteEvent `json:"input_invite_events"`
} }
// InputRoomEventsResponse is a response to InputRoomEvents // InputRoomEventsResponse is a response to InputRoomEvents

View File

@ -1,19 +1,57 @@
package api package api
import ( import (
"fmt"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
type JoinError int type PerformErrorCode int
type PerformError struct {
Msg string
Code PerformErrorCode
}
func (p *PerformError) Error() string {
return fmt.Sprintf("%d : %s", p.Code, p.Msg)
}
// JSONResponse maps error codes to suitable HTTP error codes, defaulting to 500.
func (p *PerformError) JSONResponse() util.JSONResponse {
switch p.Code {
case PerformErrorBadRequest:
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.Unknown(p.Msg),
}
case PerformErrorNoRoom:
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound(p.Msg),
}
case PerformErrorNotAllowed:
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden(p.Msg),
}
default:
return util.ErrorResponse(p)
}
}
const ( const (
// JoinErrorNotAllowed means the user is not allowed to join this room (e.g join_rule:invite or banned) // PerformErrorNotAllowed means the user is not allowed to invite/join/etc this room (e.g join_rule:invite or banned)
JoinErrorNotAllowed JoinError = 1 PerformErrorNotAllowed PerformErrorCode = 1
// JoinErrorBadRequest means the request was wrong in some way (invalid user ID, wrong server, etc) // PerformErrorBadRequest means the request was wrong in some way (invalid user ID, wrong server, etc)
JoinErrorBadRequest JoinError = 2 PerformErrorBadRequest PerformErrorCode = 2
// JoinErrorNoRoom means that the room being joined doesn't exist. // PerformErrorNoRoom means that the room being joined doesn't exist.
JoinErrorNoRoom JoinError = 3 PerformErrorNoRoom PerformErrorCode = 3
// PerformErrorNoOperation means that the request resulted in nothing happening e.g invite->invite or leave->leave.
PerformErrorNoOperation PerformErrorCode = 4
) )
type PerformJoinRequest struct { type PerformJoinRequest struct {
@ -26,10 +64,8 @@ type PerformJoinRequest struct {
type PerformJoinResponse struct { type PerformJoinResponse struct {
// The room ID, populated on success. // The room ID, populated on success.
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
// The reason why the join failed. Can be blank. // If non-nil, the join request failed. Contains more information why it failed.
Error JoinError `json:"error"` Error *PerformError
// Debugging description of the error. Always present on failure.
ErrMsg string `json:"err_msg"`
} }
type PerformLeaveRequest struct { type PerformLeaveRequest struct {
@ -40,6 +76,19 @@ type PerformLeaveRequest struct {
type PerformLeaveResponse struct { type PerformLeaveResponse struct {
} }
type PerformInviteRequest struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
SendAsServer string `json:"send_as_server"`
TransactionID *TransactionID `json:"transaction_id"`
}
type PerformInviteResponse struct {
// If non-nil, the invite request failed. Contains more information why it failed.
Error *PerformError
}
// PerformBackfillRequest is a request to PerformBackfill. // PerformBackfillRequest is a request to PerformBackfill.
type PerformBackfillRequest struct { type PerformBackfillRequest struct {
// The room to backfill // The room to backfill

View File

@ -98,16 +98,20 @@ func SendInvite(
rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent, rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState, inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) error { ) *PerformError {
request := InputRoomEventsRequest{ request := PerformInviteRequest{
InputInviteEvents: []InputInviteEvent{{
Event: inviteEvent, Event: inviteEvent,
InviteRoomState: inviteRoomState, InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion, RoomVersion: inviteEvent.RoomVersion,
SendAsServer: string(sendAsServer), SendAsServer: string(sendAsServer),
TransactionID: txnID, TransactionID: txnID,
}},
} }
var response InputRoomEventsResponse var response PerformInviteResponse
return rsAPI.InputRoomEvents(ctx, &request, &response) rsAPI.PerformInvite(ctx, &request, &response)
// we need to do this because many places people will use `var err error` as the return
// arg and a nil interface != nil pointer to a concrete interface (in this case PerformError)
if response.Error != nil && response.Error.Msg != "" {
return response.Error
}
return nil
} }

View File

@ -74,18 +74,6 @@ func (r *RoomserverInternalAPI) InputRoomEvents(
// We lock as processRoomEvent can only be called once at a time // We lock as processRoomEvent can only be called once at a time
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
for i := range request.InputInviteEvents {
var loopback *api.InputRoomEvent
if loopback, err = r.processInviteEvent(ctx, r, request.InputInviteEvents[i]); err != nil {
return err
}
// The processInviteEvent function can optionally return a
// loopback room event containing the invite, for local invites.
// If it does, we should process it with the room events below.
if loopback != nil {
request.InputRoomEvents = append(request.InputRoomEvents, *loopback)
}
}
for i := range request.InputRoomEvents { for i := range request.InputRoomEvents {
if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil { if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil {
return err return err

View File

@ -18,17 +18,12 @@ package internal
import ( import (
"context" "context"
"errors"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
) )
// processRoomEvent can only be called once at a time // processRoomEvent can only be called once at a time
@ -148,193 +143,3 @@ func (r *RoomserverInternalAPI) calculateAndSetState(
} }
return r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) return r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
} }
func (r *RoomserverInternalAPI) processInviteEvent(
ctx context.Context,
ow *RoomserverInternalAPI,
input api.InputInviteEvent,
) (*api.InputRoomEvent, error) {
if input.Event.StateKey() == nil {
return nil, fmt.Errorf("invite must be a state event")
}
roomID := input.Event.RoomID()
targetUserID := *input.Event.StateKey()
log.WithFields(log.Fields{
"event_id": input.Event.EventID(),
"room_id": roomID,
"room_version": input.RoomVersion,
"target_user_id": targetUserID,
}).Info("processing invite event")
_, domain, _ := gomatrixserverlib.SplitID('@', targetUserID)
isTargetLocalUser := domain == r.Cfg.Matrix.ServerName
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocalUser, input.RoomVersion)
if err != nil {
return nil, err
}
succeeded := false
defer func() {
txerr := sqlutil.EndTransaction(updater, &succeeded)
if err == nil && txerr != nil {
err = txerr
}
}()
if updater.IsJoin() {
// If the user is joined to the room then that takes precedence over this
// invite event. It makes little sense to move a user that is already
// joined to the room into the invite state.
// This could plausibly happen if an invite request raced with a join
// request for a user. For example if a user was invited to a public
// room and they joined the room at the same time as the invite was sent.
// The other way this could plausibly happen is if an invite raced with
// a kick. For example if a user was kicked from a room in error and in
// response someone else in the room re-invited them then it is possible
// for the invite request to race with the leave event so that the
// target receives invite before it learns that it has been kicked.
// There are a few ways this could be plausibly handled in the roomserver.
// 1) Store the invite, but mark it as retired. That will result in the
// permanent rejection of that invite event. So even if the target
// user leaves the room and the invite is retransmitted it will be
// ignored. However a new invite with a new event ID would still be
// accepted.
// 2) Silently discard the invite event. This means that if the event
// was retransmitted at a later date after the target user had left
// the room we would accept the invite. However since we hadn't told
// the sending server that the invite had been discarded it would
// have no reason to attempt to retry.
// 3) Signal the sending server that the user is already joined to the
// room.
// For now we will implement option 2. Since in the abesence of a retry
// mechanism it will be equivalent to option 1, and we don't have a
// signalling mechanism to implement option 3.
return nil, nil
}
// Normally, with a federated invite, the federation sender would do
// the /v2/invite request (in which the remote server signs the invite)
// and then the signed event gets sent back to the roomserver as an input
// event. When the invite is local, we don't interact with the federation
// sender therefore we need to generate the loopback invite event for
// the room ourselves.
loopback, err := localInviteLoopback(ow, input)
if err != nil {
return nil, err
}
event := input.Event.Unwrap()
if len(input.InviteRoomState) > 0 {
// If we were supplied with some invite room state already (which is
// most likely to be if the event came in over federation) then use
// that.
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil {
return nil, err
}
} else {
// There's no invite room state, so let's have a go at building it
// up from local data (which is most likely to be if the event came
// from the CS API). If we know about the room then we can insert
// the invite room state, if we don't then we just fail quietly.
if irs, ierr := buildInviteStrippedState(ctx, r.DB, input); ierr == nil {
if err = event.SetUnsignedField("invite_room_state", irs); err != nil {
return nil, err
}
}
}
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
if err != nil {
return nil, err
}
if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil {
return nil, err
}
succeeded = true
return loopback, nil
}
func localInviteLoopback(
ow *RoomserverInternalAPI,
input api.InputInviteEvent,
) (ire *api.InputRoomEvent, err error) {
if input.Event.StateKey() == nil {
return nil, errors.New("no state key on invite event")
}
ourServerName := string(ow.Cfg.Matrix.ServerName)
_, theirServerName, err := gomatrixserverlib.SplitID('@', *input.Event.StateKey())
if err != nil {
return nil, err
}
// Check if the invite originated locally and is destined locally.
if input.Event.Origin() == ow.Cfg.Matrix.ServerName && string(theirServerName) == ourServerName {
rsEvent := input.Event.Sign(
ourServerName,
ow.Cfg.Matrix.KeyID,
ow.Cfg.Matrix.PrivateKey,
).Headered(input.RoomVersion)
ire = &api.InputRoomEvent{
Kind: api.KindNew,
Event: rsEvent,
AuthEventIDs: rsEvent.AuthEventIDs(),
SendAsServer: ourServerName,
TransactionID: nil,
}
}
return ire, nil
}
func buildInviteStrippedState(
ctx context.Context,
db storage.Database,
input api.InputInviteEvent,
) ([]gomatrixserverlib.InviteV2StrippedState, error) {
roomNID, err := db.RoomNID(ctx, input.Event.RoomID())
if err != nil || roomNID == 0 {
return nil, fmt.Errorf("room %q unknown", input.Event.RoomID())
}
stateWanted := []gomatrixserverlib.StateKeyTuple{}
// "If they are set on the room, at least the state for m.room.avatar, m.room.canonical_alias, m.room.join_rules, and m.room.name SHOULD be included."
// https://matrix.org/docs/spec/client_server/r0.6.0#m-room-member
for _, t := range []string{
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
"m.room.avatar",
} {
stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{
EventType: t,
StateKey: "",
})
}
_, currentStateSnapshotNID, _, err := db.LatestEventIDs(ctx, roomNID)
if err != nil {
return nil, err
}
roomState := state.NewStateResolution(db)
stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
ctx, currentStateSnapshotNID, stateWanted,
)
if err != nil {
return nil, err
}
stateNIDs := []types.EventNID{}
for _, stateNID := range stateEntries {
stateNIDs = append(stateNIDs, stateNID.EventNID)
}
stateEvents, err := db.Events(ctx, stateNIDs)
if err != nil {
return nil, err
}
inviteState := []gomatrixserverlib.InviteV2StrippedState{
gomatrixserverlib.NewInviteV2StrippedState(&input.Event.Event),
}
stateEvents = append(stateEvents, types.Event{Event: input.Event.Unwrap()})
for _, event := range stateEvents {
inviteState = append(inviteState, gomatrixserverlib.NewInviteV2StrippedState(&event.Event))
}
return inviteState, nil
}

View File

@ -0,0 +1,249 @@
package internal
import (
"context"
"errors"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
// PerformInvite handles inviting to matrix rooms, including over federation by talking to the federationsender.
func (r *RoomserverInternalAPI) PerformInvite(
ctx context.Context,
req *api.PerformInviteRequest,
res *api.PerformInviteResponse,
) {
err := r.performInvite(ctx, req)
if err != nil {
perr, ok := err.(*api.PerformError)
if ok {
res.Error = perr
} else {
res.Error = &api.PerformError{
Msg: err.Error(),
}
}
}
}
func (r *RoomserverInternalAPI) performInvite(ctx context.Context,
req *api.PerformInviteRequest,
) error {
loopback, err := r.processInviteEvent(ctx, r, req)
if err != nil {
return err
}
// The processInviteEvent function can optionally return a
// loopback room event containing the invite, for local invites.
// If it does, we should process it with the room events below.
if loopback != nil {
var loopbackRes api.InputRoomEventsResponse
err := r.InputRoomEvents(ctx, &api.InputRoomEventsRequest{
InputRoomEvents: []api.InputRoomEvent{*loopback},
}, &loopbackRes)
if err != nil {
return err
}
}
return nil
}
func (r *RoomserverInternalAPI) processInviteEvent(
ctx context.Context,
ow *RoomserverInternalAPI,
input *api.PerformInviteRequest,
) (*api.InputRoomEvent, error) {
if input.Event.StateKey() == nil {
return nil, fmt.Errorf("invite must be a state event")
}
roomID := input.Event.RoomID()
targetUserID := *input.Event.StateKey()
log.WithFields(log.Fields{
"event_id": input.Event.EventID(),
"room_id": roomID,
"room_version": input.RoomVersion,
"target_user_id": targetUserID,
}).Info("processing invite event")
_, domain, _ := gomatrixserverlib.SplitID('@', targetUserID)
isTargetLocalUser := domain == r.Cfg.Matrix.ServerName
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocalUser, input.RoomVersion)
if err != nil {
return nil, err
}
succeeded := false
defer func() {
txerr := sqlutil.EndTransaction(updater, &succeeded)
if err == nil && txerr != nil {
err = txerr
}
}()
if updater.IsJoin() {
// If the user is joined to the room then that takes precedence over this
// invite event. It makes little sense to move a user that is already
// joined to the room into the invite state.
// This could plausibly happen if an invite request raced with a join
// request for a user. For example if a user was invited to a public
// room and they joined the room at the same time as the invite was sent.
// The other way this could plausibly happen is if an invite raced with
// a kick. For example if a user was kicked from a room in error and in
// response someone else in the room re-invited them then it is possible
// for the invite request to race with the leave event so that the
// target receives invite before it learns that it has been kicked.
// There are a few ways this could be plausibly handled in the roomserver.
// 1) Store the invite, but mark it as retired. That will result in the
// permanent rejection of that invite event. So even if the target
// user leaves the room and the invite is retransmitted it will be
// ignored. However a new invite with a new event ID would still be
// accepted.
// 2) Silently discard the invite event. This means that if the event
// was retransmitted at a later date after the target user had left
// the room we would accept the invite. However since we hadn't told
// the sending server that the invite had been discarded it would
// have no reason to attempt to retry.
// 3) Signal the sending server that the user is already joined to the
// room.
// For now we will implement option 2. Since in the abesence of a retry
// mechanism it will be equivalent to option 1, and we don't have a
// signalling mechanism to implement option 3.
return nil, &api.PerformError{
Code: api.PerformErrorNoOperation,
Msg: "user is already joined to room",
}
}
// Normally, with a federated invite, the federation sender would do
// the /v2/invite request (in which the remote server signs the invite)
// and then the signed event gets sent back to the roomserver as an input
// event. When the invite is local, we don't interact with the federation
// sender therefore we need to generate the loopback invite event for
// the room ourselves.
loopback, err := localInviteLoopback(ow, input)
if err != nil {
return nil, err
}
event := input.Event.Unwrap()
if len(input.InviteRoomState) > 0 {
// If we were supplied with some invite room state already (which is
// most likely to be if the event came in over federation) then use
// that.
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil {
return nil, err
}
} else {
// There's no invite room state, so let's have a go at building it
// up from local data (which is most likely to be if the event came
// from the CS API). If we know about the room then we can insert
// the invite room state, if we don't then we just fail quietly.
if irs, ierr := buildInviteStrippedState(ctx, r.DB, input); ierr == nil {
if err = event.SetUnsignedField("invite_room_state", irs); err != nil {
return nil, err
}
}
}
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
if err != nil {
return nil, err
}
if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil {
return nil, err
}
succeeded = true
return loopback, nil
}
func localInviteLoopback(
ow *RoomserverInternalAPI,
input *api.PerformInviteRequest,
) (ire *api.InputRoomEvent, err error) {
if input.Event.StateKey() == nil {
return nil, errors.New("no state key on invite event")
}
ourServerName := string(ow.Cfg.Matrix.ServerName)
_, theirServerName, err := gomatrixserverlib.SplitID('@', *input.Event.StateKey())
if err != nil {
return nil, err
}
// Check if the invite originated locally and is destined locally.
if input.Event.Origin() == ow.Cfg.Matrix.ServerName && string(theirServerName) == ourServerName {
rsEvent := input.Event.Sign(
ourServerName,
ow.Cfg.Matrix.KeyID,
ow.Cfg.Matrix.PrivateKey,
).Headered(input.RoomVersion)
ire = &api.InputRoomEvent{
Kind: api.KindNew,
Event: rsEvent,
AuthEventIDs: rsEvent.AuthEventIDs(),
SendAsServer: ourServerName,
TransactionID: nil,
}
}
return ire, nil
}
func buildInviteStrippedState(
ctx context.Context,
db storage.Database,
input *api.PerformInviteRequest,
) ([]gomatrixserverlib.InviteV2StrippedState, error) {
roomNID, err := db.RoomNID(ctx, input.Event.RoomID())
if err != nil || roomNID == 0 {
return nil, fmt.Errorf("room %q unknown", input.Event.RoomID())
}
stateWanted := []gomatrixserverlib.StateKeyTuple{}
// "If they are set on the room, at least the state for m.room.avatar, m.room.canonical_alias, m.room.join_rules, and m.room.name SHOULD be included."
// https://matrix.org/docs/spec/client_server/r0.6.0#m-room-member
for _, t := range []string{
gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias,
gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules,
"m.room.avatar",
} {
stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{
EventType: t,
StateKey: "",
})
}
_, currentStateSnapshotNID, _, err := db.LatestEventIDs(ctx, roomNID)
if err != nil {
return nil, err
}
roomState := state.NewStateResolution(db)
stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
ctx, currentStateSnapshotNID, stateWanted,
)
if err != nil {
return nil, err
}
stateNIDs := []types.EventNID{}
for _, stateNID := range stateEntries {
stateNIDs = append(stateNIDs, stateNID.EventNID)
}
stateEvents, err := db.Events(ctx, stateNIDs)
if err != nil {
return nil, err
}
inviteState := []gomatrixserverlib.InviteV2StrippedState{
gomatrixserverlib.NewInviteV2StrippedState(&input.Event.Event),
}
stateEvents = append(stateEvents, types.Event{Event: input.Event.Unwrap()})
for _, event := range stateEvents {
inviteState = append(inviteState, gomatrixserverlib.NewInviteV2StrippedState(&event.Event))
}
return inviteState, nil
}

View File

@ -14,40 +14,63 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// WriteOutputEvents implements OutputRoomEventWriter // PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender.
func (r *RoomserverInternalAPI) PerformJoin( func (r *RoomserverInternalAPI) PerformJoin(
ctx context.Context, ctx context.Context,
req *api.PerformJoinRequest, req *api.PerformJoinRequest,
res *api.PerformJoinResponse, res *api.PerformJoinResponse,
) error { ) {
roomID, err := r.performJoin(ctx, req)
if err != nil {
perr, ok := err.(*api.PerformError)
if ok {
res.Error = perr
} else {
res.Error = &api.PerformError{
Msg: err.Error(),
}
}
}
res.RoomID = roomID
}
func (r *RoomserverInternalAPI) performJoin(
ctx context.Context,
req *api.PerformJoinRequest,
) (string, error) {
_, domain, err := gomatrixserverlib.SplitID('@', req.UserID) _, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil { if err != nil {
res.Error = api.JoinErrorBadRequest return "", &api.PerformError{
return fmt.Errorf("Supplied user ID %q in incorrect format", req.UserID) Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("Supplied user ID %q in incorrect format", req.UserID),
}
} }
if domain != r.Cfg.Matrix.ServerName { if domain != r.Cfg.Matrix.ServerName {
res.Error = api.JoinErrorBadRequest return "", &api.PerformError{
return fmt.Errorf("User %q does not belong to this homeserver", req.UserID) Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("User %q does not belong to this homeserver", req.UserID),
}
} }
if strings.HasPrefix(req.RoomIDOrAlias, "!") { if strings.HasPrefix(req.RoomIDOrAlias, "!") {
return r.performJoinRoomByID(ctx, req, res) return r.performJoinRoomByID(ctx, req)
} }
if strings.HasPrefix(req.RoomIDOrAlias, "#") { if strings.HasPrefix(req.RoomIDOrAlias, "#") {
return r.performJoinRoomByAlias(ctx, req, res) return r.performJoinRoomByAlias(ctx, req)
}
return "", &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("Room ID or alias %q is invalid", req.RoomIDOrAlias),
} }
res.Error = api.JoinErrorBadRequest
return fmt.Errorf("Room ID or alias %q is invalid", req.RoomIDOrAlias)
} }
func (r *RoomserverInternalAPI) performJoinRoomByAlias( func (r *RoomserverInternalAPI) performJoinRoomByAlias(
ctx context.Context, ctx context.Context,
req *api.PerformJoinRequest, req *api.PerformJoinRequest,
res *api.PerformJoinResponse, ) (string, error) {
) error {
// Get the domain part of the room alias. // Get the domain part of the room alias.
_, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias) _, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias)
if err != nil { if err != nil {
return fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias) return "", fmt.Errorf("Alias %q is not in the correct format", req.RoomIDOrAlias)
} }
req.ServerNames = append(req.ServerNames, domain) req.ServerNames = append(req.ServerNames, domain)
@ -65,7 +88,7 @@ func (r *RoomserverInternalAPI) performJoinRoomByAlias(
err = r.fsAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes) err = r.fsAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes)
if err != nil { if err != nil {
logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias) logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias)
return fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err) return "", fmt.Errorf("Looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err)
} }
roomID = dirRes.RoomID roomID = dirRes.RoomID
req.ServerNames = append(req.ServerNames, dirRes.ServerNames...) req.ServerNames = append(req.ServerNames, dirRes.ServerNames...)
@ -73,18 +96,18 @@ func (r *RoomserverInternalAPI) performJoinRoomByAlias(
// Otherwise, look up if we know this room alias locally. // Otherwise, look up if we know this room alias locally.
roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias) roomID, err = r.DB.GetRoomIDForAlias(ctx, req.RoomIDOrAlias)
if err != nil { if err != nil {
return fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err) return "", fmt.Errorf("Lookup room alias %q failed: %w", req.RoomIDOrAlias, err)
} }
} }
// If the room ID is empty then we failed to look up the alias. // If the room ID is empty then we failed to look up the alias.
if roomID == "" { if roomID == "" {
return fmt.Errorf("Alias %q not found", req.RoomIDOrAlias) return "", fmt.Errorf("Alias %q not found", req.RoomIDOrAlias)
} }
// If we do, then pluck out the room ID and continue the join. // If we do, then pluck out the room ID and continue the join.
req.RoomIDOrAlias = roomID req.RoomIDOrAlias = roomID
return r.performJoinRoomByID(ctx, req, res) return r.performJoinRoomByID(ctx, req)
} }
// TODO: Break this function up a bit // TODO: Break this function up a bit
@ -92,19 +115,14 @@ func (r *RoomserverInternalAPI) performJoinRoomByAlias(
func (r *RoomserverInternalAPI) performJoinRoomByID( func (r *RoomserverInternalAPI) performJoinRoomByID(
ctx context.Context, ctx context.Context,
req *api.PerformJoinRequest, req *api.PerformJoinRequest,
res *api.PerformJoinResponse, // nolint:unparam ) (string, error) {
) error {
// By this point, if req.RoomIDOrAlias contained an alias, then
// it will have been overwritten with a room ID by performJoinRoomByAlias.
// We should now include this in the response so that the CS API can
// return the right room ID.
res.RoomID = req.RoomIDOrAlias
// Get the domain part of the room ID. // Get the domain part of the room ID.
_, domain, err := gomatrixserverlib.SplitID('!', req.RoomIDOrAlias) _, domain, err := gomatrixserverlib.SplitID('!', req.RoomIDOrAlias)
if err != nil { if err != nil {
res.Error = api.JoinErrorBadRequest return "", &api.PerformError{
return fmt.Errorf("Room ID %q is invalid", req.RoomIDOrAlias) Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("Room ID %q is invalid: %s", req.RoomIDOrAlias, err),
}
} }
req.ServerNames = append(req.ServerNames, domain) req.ServerNames = append(req.ServerNames, domain)
@ -118,7 +136,7 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
Redacts: "", Redacts: "",
} }
if err = eb.SetUnsigned(struct{}{}); err != nil { if err = eb.SetUnsigned(struct{}{}); err != nil {
return fmt.Errorf("eb.SetUnsigned: %w", err) return "", fmt.Errorf("eb.SetUnsigned: %w", err)
} }
// It is possible for the request to include some "content" for the // It is possible for the request to include some "content" for the
@ -129,7 +147,7 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
} }
req.Content["membership"] = gomatrixserverlib.Join req.Content["membership"] = gomatrixserverlib.Join
if err = eb.SetContent(req.Content); err != nil { if err = eb.SetContent(req.Content); err != nil {
return fmt.Errorf("eb.SetContent: %w", err) return "", fmt.Errorf("eb.SetContent: %w", err)
} }
// First work out if this is in response to an existing invite // First work out if this is in response to an existing invite
@ -142,7 +160,7 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
// Check if there's an invite pending. // Check if there's an invite pending.
_, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender) _, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
if ierr != nil { if ierr != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) return "", fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
} }
// Check that the domain isn't ours. If it's local then we don't // Check that the domain isn't ours. If it's local then we don't
@ -154,7 +172,7 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
req.ServerNames = append(req.ServerNames, inviterDomain) req.ServerNames = append(req.ServerNames, inviterDomain)
// Perform a federated room join. // Perform a federated room join.
return r.performFederatedJoinRoomByID(ctx, req, res) return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
} }
} }
@ -205,9 +223,12 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
if err = r.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil { if err = r.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
var notAllowed *gomatrixserverlib.NotAllowed var notAllowed *gomatrixserverlib.NotAllowed
if errors.As(err, &notAllowed) { if errors.As(err, &notAllowed) {
res.Error = api.JoinErrorNotAllowed return "", &api.PerformError{
Code: api.PerformErrorNotAllowed,
Msg: fmt.Sprintf("InputRoomEvents auth failed: %s", err),
} }
return fmt.Errorf("r.InputRoomEvents: %w", err) }
return "", fmt.Errorf("r.InputRoomEvents: %w", err)
} }
} }
@ -216,25 +237,30 @@ func (r *RoomserverInternalAPI) performJoinRoomByID(
// room. If it is then there's nothing more to do - the room just // room. If it is then there's nothing more to do - the room just
// hasn't been created yet. // hasn't been created yet.
if domain == r.Cfg.Matrix.ServerName { if domain == r.Cfg.Matrix.ServerName {
res.Error = api.JoinErrorNoRoom return "", &api.PerformError{
return fmt.Errorf("Room ID %q does not exist", req.RoomIDOrAlias) Code: api.PerformErrorNoRoom,
Msg: fmt.Sprintf("Room ID %q does not exist", req.RoomIDOrAlias),
}
} }
// Perform a federated room join. // Perform a federated room join.
return r.performFederatedJoinRoomByID(ctx, req, res) return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
default: default:
// Something else went wrong. // Something else went wrong.
return fmt.Errorf("Error joining local room: %q", err) return "", fmt.Errorf("Error joining local room: %q", err)
} }
return nil // By this point, if req.RoomIDOrAlias contained an alias, then
// it will have been overwritten with a room ID by performJoinRoomByAlias.
// We should now include this in the response so that the CS API can
// return the right room ID.
return req.RoomIDOrAlias, nil
} }
func (r *RoomserverInternalAPI) performFederatedJoinRoomByID( func (r *RoomserverInternalAPI) performFederatedJoinRoomByID(
ctx context.Context, ctx context.Context,
req *api.PerformJoinRequest, req *api.PerformJoinRequest,
res *api.PerformJoinResponse, // nolint:unparam
) error { ) error {
// Try joining by all of the supplied server names. // Try joining by all of the supplied server names.
fedReq := fsAPI.PerformJoinRequest{ fedReq := fsAPI.PerformJoinRequest{

View File

@ -3,6 +3,7 @@ package inthttp
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"net/http" "net/http"
fsInputAPI "github.com/matrix-org/dendrite/federationsender/api" fsInputAPI "github.com/matrix-org/dendrite/federationsender/api"
@ -24,6 +25,7 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents" RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations // Perform operations
RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformJoinPath = "/roomserver/performJoin" RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave" RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill" RoomserverPerformBackfillPath = "/roomserver/performBackfill"
@ -146,16 +148,38 @@ func (h *httpRoomserverInternalAPI) InputRoomEvents(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
} }
func (h *httpRoomserverInternalAPI) PerformInvite(
ctx context.Context,
request *api.PerformInviteRequest,
response *api.PerformInviteResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInvite")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformInvitePath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.PerformError{
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
}
}
}
func (h *httpRoomserverInternalAPI) PerformJoin( func (h *httpRoomserverInternalAPI) PerformJoin(
ctx context.Context, ctx context.Context,
request *api.PerformJoinRequest, request *api.PerformJoinRequest,
response *api.PerformJoinResponse, response *api.PerformJoinResponse,
) error { ) {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformJoin") span, ctx := opentracing.StartSpanFromContext(ctx, "PerformJoin")
defer span.Finish() defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformJoinPath apiURL := h.roomserverURL + RoomserverPerformJoinPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.PerformError{
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
}
}
} }
func (h *httpRoomserverInternalAPI) PerformLeave( func (h *httpRoomserverInternalAPI) PerformLeave(

View File

@ -26,6 +26,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )
internalAPIMux.Handle(RoomserverPerformInvitePath,
httputil.MakeInternalAPI("performInvite", func(req *http.Request) util.JSONResponse {
var request api.PerformInviteRequest
var response api.PerformInviteResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
r.PerformInvite(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(RoomserverPerformJoinPath, internalAPIMux.Handle(RoomserverPerformJoinPath,
httputil.MakeInternalAPI("performJoin", func(req *http.Request) util.JSONResponse { httputil.MakeInternalAPI("performJoin", func(req *http.Request) util.JSONResponse {
var request api.PerformJoinRequest var request api.PerformJoinRequest
@ -33,9 +44,7 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
if err := json.NewDecoder(req.Body).Decode(&request); err != nil { if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error()) return util.MessageResponse(http.StatusBadRequest, err.Error())
} }
if err := r.PerformJoin(req.Context(), &request, &response); err != nil { r.PerformJoin(req.Context(), &request, &response)
response.ErrMsg = err.Error()
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response} return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}), }),
) )