Peeking updates (#1607)
* Add unpeek * Don't allow peeks into encrypted rooms * Fix send tests * Update consumers
This commit is contained in:
parent
2b03d24358
commit
be7d8595be
16 changed files with 300 additions and 2 deletions
|
@ -77,3 +77,28 @@ func PeekRoomByIDOrAlias(
|
|||
}{peekRes.RoomID},
|
||||
}
|
||||
}
|
||||
|
||||
func UnpeekRoomByID(
|
||||
req *http.Request,
|
||||
device *api.Device,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
accountDB accounts.Database,
|
||||
roomID string,
|
||||
) util.JSONResponse {
|
||||
unpeekReq := roomserverAPI.PerformUnpeekRequest{
|
||||
RoomID: roomID,
|
||||
UserID: device.UserID,
|
||||
DeviceID: device.ID,
|
||||
}
|
||||
unpeekRes := roomserverAPI.PerformUnpeekResponse{}
|
||||
|
||||
rsAPI.PerformUnpeek(req.Context(), &unpeekReq, &unpeekRes)
|
||||
if unpeekRes.Error != nil {
|
||||
return unpeekRes.Error.JSONResponse()
|
||||
}
|
||||
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusOK,
|
||||
JSON: struct{}{},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,6 +106,9 @@ func Setup(
|
|||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/peek/{roomIDOrAlias}",
|
||||
httputil.MakeAuthAPI(gomatrixserverlib.Peek, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
if r := rateLimits.rateLimit(req); r != nil {
|
||||
return *r
|
||||
}
|
||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
|
@ -148,6 +151,17 @@ func Setup(
|
|||
)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/rooms/{roomID}/unpeek",
|
||||
httputil.MakeAuthAPI("unpeek", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||
if err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return UnpeekRoomByID(
|
||||
req, device, rsAPI, accountDB, vars["roomID"],
|
||||
)
|
||||
}),
|
||||
).Methods(http.MethodPost, http.MethodOptions)
|
||||
r0mux.Handle("/rooms/{roomID}/ban",
|
||||
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||
|
|
|
@ -131,6 +131,13 @@ func (t *testRoomserverAPI) PerformPeek(
|
|||
) {
|
||||
}
|
||||
|
||||
func (t *testRoomserverAPI) PerformUnpeek(
|
||||
ctx context.Context,
|
||||
req *api.PerformUnpeekRequest,
|
||||
res *api.PerformUnpeekResponse,
|
||||
) {
|
||||
}
|
||||
|
||||
func (t *testRoomserverAPI) PerformPublish(
|
||||
ctx context.Context,
|
||||
req *api.PerformPublishRequest,
|
||||
|
|
|
@ -42,6 +42,12 @@ type RoomserverInternalAPI interface {
|
|||
res *PerformPeekResponse,
|
||||
)
|
||||
|
||||
PerformUnpeek(
|
||||
ctx context.Context,
|
||||
req *PerformUnpeekRequest,
|
||||
res *PerformUnpeekResponse,
|
||||
)
|
||||
|
||||
PerformPublish(
|
||||
ctx context.Context,
|
||||
req *PerformPublishRequest,
|
||||
|
|
|
@ -46,6 +46,15 @@ func (t *RoomserverInternalAPITrace) PerformPeek(
|
|||
util.GetLogger(ctx).Infof("PerformPeek req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformUnpeek(
|
||||
ctx context.Context,
|
||||
req *PerformUnpeekRequest,
|
||||
res *PerformUnpeekResponse,
|
||||
) {
|
||||
t.Impl.PerformUnpeek(ctx, req, res)
|
||||
util.GetLogger(ctx).Infof("PerformUnpeek req=%+v res=%+v", js(req), js(res))
|
||||
}
|
||||
|
||||
func (t *RoomserverInternalAPITrace) PerformJoin(
|
||||
ctx context.Context,
|
||||
req *PerformJoinRequest,
|
||||
|
|
|
@ -51,6 +51,8 @@ const (
|
|||
|
||||
// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
|
||||
OutputTypeNewPeek OutputType = "new_peek"
|
||||
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
|
||||
OutputTypeRetirePeek OutputType = "retire_peek"
|
||||
)
|
||||
|
||||
// An OutputEvent is an entry in the roomserver output kafka log.
|
||||
|
@ -70,6 +72,8 @@ type OutputEvent struct {
|
|||
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
|
||||
// The content of event with type OutputTypeNewPeek
|
||||
NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
|
||||
// The content of event with type OutputTypeRetirePeek
|
||||
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
|
||||
}
|
||||
|
||||
// Type of the OutputNewRoomEvent.
|
||||
|
@ -240,3 +244,10 @@ type OutputNewPeek struct {
|
|||
UserID string
|
||||
DeviceID string
|
||||
}
|
||||
|
||||
// An OutputRetirePeek is written whenever a user stops peeking into a room.
|
||||
type OutputRetirePeek struct {
|
||||
RoomID string
|
||||
UserID string
|
||||
DeviceID string
|
||||
}
|
||||
|
|
|
@ -123,6 +123,17 @@ type PerformPeekResponse struct {
|
|||
Error *PerformError
|
||||
}
|
||||
|
||||
type PerformUnpeekRequest struct {
|
||||
RoomID string `json:"room_id"`
|
||||
UserID string `json:"user_id"`
|
||||
DeviceID string `json:"device_id"`
|
||||
}
|
||||
|
||||
type PerformUnpeekResponse struct {
|
||||
// If non-nil, the join request failed. Contains more information why it failed.
|
||||
Error *PerformError
|
||||
}
|
||||
|
||||
// PerformBackfillRequest is a request to PerformBackfill.
|
||||
type PerformBackfillRequest struct {
|
||||
// The room to backfill
|
||||
|
|
|
@ -23,6 +23,7 @@ type RoomserverInternalAPI struct {
|
|||
*perform.Inviter
|
||||
*perform.Joiner
|
||||
*perform.Peeker
|
||||
*perform.Unpeeker
|
||||
*perform.Leaver
|
||||
*perform.Publisher
|
||||
*perform.Backfiller
|
||||
|
@ -94,6 +95,13 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
|
|||
FSAPI: r.fsAPI,
|
||||
Inputer: r.Inputer,
|
||||
}
|
||||
r.Unpeeker = &perform.Unpeeker{
|
||||
ServerName: r.Cfg.Matrix.ServerName,
|
||||
Cfg: r.Cfg,
|
||||
DB: r.DB,
|
||||
FSAPI: r.fsAPI,
|
||||
Inputer: r.Inputer,
|
||||
}
|
||||
r.Leaver = &perform.Leaver{
|
||||
Cfg: r.Cfg,
|
||||
DB: r.DB,
|
||||
|
|
|
@ -163,8 +163,7 @@ func (r *Peeker) performPeekRoomByID(
|
|||
// XXX: we should probably factor out history_visibility checks into a common utility method somewhere
|
||||
// which handles the default value etc.
|
||||
var worldReadable = false
|
||||
ev, _ := r.DB.GetStateEvent(ctx, roomID, "m.room.history_visibility", "")
|
||||
if ev != nil {
|
||||
if ev, _ := r.DB.GetStateEvent(ctx, roomID, "m.room.history_visibility", ""); ev != nil {
|
||||
content := map[string]string{}
|
||||
if err = json.Unmarshal(ev.Content(), &content); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("json.Unmarshal for history visibility failed")
|
||||
|
@ -182,6 +181,13 @@ func (r *Peeker) performPeekRoomByID(
|
|||
}
|
||||
}
|
||||
|
||||
if ev, _ := r.DB.GetStateEvent(ctx, roomID, "m.room.encryption", ""); ev != nil {
|
||||
return "", &api.PerformError{
|
||||
Code: api.PerformErrorNotAllowed,
|
||||
Msg: "Cannot peek into an encrypted room",
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: handle federated peeks
|
||||
|
||||
err = r.Inputer.WriteOutputEvents(roomID, []api.OutputEvent{
|
||||
|
|
118
roomserver/internal/perform/perform_unpeek.go
Normal file
118
roomserver/internal/perform/perform_unpeek.go
Normal file
|
@ -0,0 +1,118 @@
|
|||
// Copyright 2020 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package perform
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Unpeeker struct {
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
Cfg *config.RoomServer
|
||||
FSAPI fsAPI.FederationSenderInternalAPI
|
||||
DB storage.Database
|
||||
|
||||
Inputer *input.Inputer
|
||||
}
|
||||
|
||||
// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationsender.
|
||||
func (r *Unpeeker) PerformUnpeek(
|
||||
ctx context.Context,
|
||||
req *api.PerformUnpeekRequest,
|
||||
res *api.PerformUnpeekResponse,
|
||||
) {
|
||||
if err := r.performUnpeek(ctx, req); err != nil {
|
||||
perr, ok := err.(*api.PerformError)
|
||||
if ok {
|
||||
res.Error = perr
|
||||
} else {
|
||||
res.Error = &api.PerformError{
|
||||
Msg: err.Error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Unpeeker) performUnpeek(
|
||||
ctx context.Context,
|
||||
req *api.PerformUnpeekRequest,
|
||||
) error {
|
||||
// FIXME: there's way too much duplication with performJoin
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
|
||||
if err != nil {
|
||||
return &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Supplied user ID %q in incorrect format", req.UserID),
|
||||
}
|
||||
}
|
||||
if domain != r.Cfg.Matrix.ServerName {
|
||||
return &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("User %q does not belong to this homeserver", req.UserID),
|
||||
}
|
||||
}
|
||||
if strings.HasPrefix(req.RoomID, "!") {
|
||||
return r.performUnpeekRoomByID(ctx, req)
|
||||
}
|
||||
return &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Room ID %q is invalid", req.RoomID),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Unpeeker) performUnpeekRoomByID(
|
||||
_ context.Context,
|
||||
req *api.PerformUnpeekRequest,
|
||||
) (err error) {
|
||||
// Get the domain part of the room ID.
|
||||
_, _, err = gomatrixserverlib.SplitID('!', req.RoomID)
|
||||
if err != nil {
|
||||
return &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: fmt.Sprintf("Room ID %q is invalid: %s", req.RoomID, err),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: handle federated peeks
|
||||
|
||||
err = r.Inputer.WriteOutputEvents(req.RoomID, []api.OutputEvent{
|
||||
{
|
||||
Type: api.OutputTypeRetirePeek,
|
||||
RetirePeek: &api.OutputRetirePeek{
|
||||
RoomID: req.RoomID,
|
||||
UserID: req.UserID,
|
||||
DeviceID: req.DeviceID,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// By this point, if req.RoomIDOrAlias contained an alias, then
|
||||
// it will have been overwritten with a room ID by performPeekRoomByAlias.
|
||||
// We should now include this in the response so that the CS API can
|
||||
// return the right room ID.
|
||||
return nil
|
||||
}
|
|
@ -27,6 +27,7 @@ const (
|
|||
// Perform operations
|
||||
RoomserverPerformInvitePath = "/roomserver/performInvite"
|
||||
RoomserverPerformPeekPath = "/roomserver/performPeek"
|
||||
RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
|
||||
RoomserverPerformJoinPath = "/roomserver/performJoin"
|
||||
RoomserverPerformLeavePath = "/roomserver/performLeave"
|
||||
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
|
||||
|
@ -209,6 +210,23 @@ func (h *httpRoomserverInternalAPI) PerformPeek(
|
|||
}
|
||||
}
|
||||
|
||||
func (h *httpRoomserverInternalAPI) PerformUnpeek(
|
||||
ctx context.Context,
|
||||
request *api.PerformUnpeekRequest,
|
||||
response *api.PerformUnpeekResponse,
|
||||
) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformUnpeek")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.roomserverURL + RoomserverPerformUnpeekPath
|
||||
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
if err != nil {
|
||||
response.Error = &api.PerformError{
|
||||
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpRoomserverInternalAPI) PerformLeave(
|
||||
ctx context.Context,
|
||||
request *api.PerformLeaveRequest,
|
||||
|
|
|
@ -72,6 +72,17 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(RoomserverPerformPeekPath,
|
||||
httputil.MakeInternalAPI("performUnpeek", func(req *http.Request) util.JSONResponse {
|
||||
var request api.PerformUnpeekRequest
|
||||
var response api.PerformUnpeekResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
r.PerformUnpeek(req.Context(), &request, &response)
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(RoomserverPerformPublishPath,
|
||||
httputil.MakeInternalAPI("performPublish", func(req *http.Request) util.JSONResponse {
|
||||
var request api.PerformPublishRequest
|
||||
|
|
|
@ -105,6 +105,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
|
||||
case api.OutputTypeNewPeek:
|
||||
return s.onNewPeek(context.TODO(), *output.NewPeek)
|
||||
case api.OutputTypeRetirePeek:
|
||||
return s.onRetirePeek(context.TODO(), *output.RetirePeek)
|
||||
case api.OutputTypeRedactedEvent:
|
||||
return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
|
||||
default:
|
||||
|
@ -309,6 +311,26 @@ func (s *OutputRoomEventConsumer) onNewPeek(
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *OutputRoomEventConsumer) onRetirePeek(
|
||||
ctx context.Context, msg api.OutputRetirePeek,
|
||||
) error {
|
||||
sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
||||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write peek failure")
|
||||
return nil
|
||||
}
|
||||
// tell the notifier about the new peek so it knows to wake up new devices
|
||||
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
|
||||
|
||||
// we need to wake up the users who might need to now be peeking into this room,
|
||||
// so we send in a dummy event to trigger a wakeup
|
||||
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
if event.StateKey() == nil {
|
||||
return event, nil
|
||||
|
|
|
@ -91,6 +91,9 @@ type Database interface {
|
|||
// AddPeek adds a new peek to our DB for a given room by a given user's device.
|
||||
// Returns an error if there was a problem communicating with the database.
|
||||
AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error)
|
||||
// DeletePeek removes an existing peek from the database for a given room by a user's device.
|
||||
// Returns an error if there was a problem communicating with the database.
|
||||
DeletePeek(ctx context.Context, roomID, userID, deviceID string) (sp types.StreamPosition, err error)
|
||||
// DeletePeek deletes all peeks for a given room by a given user
|
||||
// Returns an error if there was a problem communicating with the database.
|
||||
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
|
||||
|
|
|
@ -178,6 +178,23 @@ func (d *Database) AddPeek(
|
|||
return
|
||||
}
|
||||
|
||||
// DeletePeeks tracks the fact that a user has stopped peeking from the specified
|
||||
// device. If the peeks was successfully deleted this returns the stream ID it was
|
||||
// stored at. Returns an error if there was a problem communicating with the database.
|
||||
func (d *Database) DeletePeek(
|
||||
ctx context.Context, roomID, userID, deviceID string,
|
||||
) (sp types.StreamPosition, err error) {
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
sp, err = d.Peeks.DeletePeek(ctx, txn, roomID, userID, deviceID)
|
||||
return err
|
||||
})
|
||||
if err == sql.ErrNoRows {
|
||||
sp = 0
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeletePeeks tracks the fact that a user has stopped peeking from all devices
|
||||
// If the peeks was successfully deleted this returns the stream ID it was stored at.
|
||||
// Returns an error if there was a problem communicating with the database.
|
||||
|
|
|
@ -137,6 +137,18 @@ func (n *Notifier) OnNewPeek(
|
|||
// by calling OnNewEvent.
|
||||
}
|
||||
|
||||
func (n *Notifier) OnRetirePeek(
|
||||
roomID, userID, deviceID string,
|
||||
) {
|
||||
n.streamLock.Lock()
|
||||
defer n.streamLock.Unlock()
|
||||
|
||||
n.removePeekingDevice(roomID, userID, deviceID)
|
||||
|
||||
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
|
||||
// by calling OnRetireEvent.
|
||||
}
|
||||
|
||||
func (n *Notifier) OnNewSendToDevice(
|
||||
userID string, deviceIDs []string,
|
||||
posUpdate types.StreamingToken,
|
||||
|
|
Loading…
Reference in a new issue