b28674435e
* Correctly generate backpagination tokens for events which have the same depth With tests. Unfortunately the code around here is hard to understand. There will be a subsequent PR which fixes this up now that we have a test harness in place. * Add postgres impl * More linting * Fix psql statement so it actually works
289 lines
11 KiB
Go
289 lines
11 KiB
Go
// Copyright 2017 Vector Creations Ltd
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package types
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/tidwall/gjson"
|
|
)
|
|
|
|
var (
|
|
// ErrInvalidPaginationTokenType is returned when an attempt at creating a
|
|
// new instance of PaginationToken with an invalid type (i.e. neither "s"
|
|
// nor "t").
|
|
ErrInvalidPaginationTokenType = fmt.Errorf("Pagination token has an unknown prefix (should be either s or t)")
|
|
// ErrInvalidPaginationTokenLen is returned when the pagination token is an
|
|
// invalid length
|
|
ErrInvalidPaginationTokenLen = fmt.Errorf("Pagination token has an invalid length")
|
|
)
|
|
|
|
// StreamPosition represents the offset in the sync stream a client is at.
|
|
type StreamPosition int64
|
|
|
|
// Same as gomatrixserverlib.Event but also has the PDU stream position for this event.
|
|
type StreamEvent struct {
|
|
gomatrixserverlib.HeaderedEvent
|
|
StreamPosition StreamPosition
|
|
TransactionID *api.TransactionID
|
|
ExcludeFromSync bool
|
|
}
|
|
|
|
// PaginationTokenType represents the type of a pagination token.
|
|
// It can be either "s" (representing a position in the whole stream of events)
|
|
// or "t" (representing a position in a room's topology/depth).
|
|
type PaginationTokenType string
|
|
|
|
const (
|
|
// PaginationTokenTypeStream represents a position in the server's whole
|
|
// stream of events
|
|
PaginationTokenTypeStream PaginationTokenType = "s"
|
|
// PaginationTokenTypeTopology represents a position in a room's topology.
|
|
PaginationTokenTypeTopology PaginationTokenType = "t"
|
|
)
|
|
|
|
// PaginationToken represents a pagination token, used for interactions with
|
|
// /sync or /messages, for example.
|
|
type PaginationToken struct {
|
|
//Position StreamPosition
|
|
Type PaginationTokenType
|
|
// For /sync, this is the PDU position. For /messages, this is the topological position (depth).
|
|
// TODO: Given how different the positions are depending on the token type, they should probably be renamed
|
|
// or use different structs altogether.
|
|
PDUPosition StreamPosition
|
|
// For /sync, this is the EDU position. For /messages, this is the stream (PDU) position.
|
|
// TODO: Given how different the positions are depending on the token type, they should probably be renamed
|
|
// or use different structs altogether.
|
|
EDUTypingPosition StreamPosition
|
|
}
|
|
|
|
// NewPaginationTokenFromString takes a string of the form "xyyyy..." where "x"
|
|
// represents the type of a pagination token and "yyyy..." the token itself, and
|
|
// parses it in order to create a new instance of PaginationToken. Returns an
|
|
// error if the token couldn't be parsed into an int64, or if the token type
|
|
// isn't a known type (returns ErrInvalidPaginationTokenType in the latter
|
|
// case).
|
|
func NewPaginationTokenFromString(s string) (token *PaginationToken, err error) {
|
|
if len(s) == 0 {
|
|
return nil, ErrInvalidPaginationTokenLen
|
|
}
|
|
|
|
token = new(PaginationToken)
|
|
var positions []string
|
|
|
|
switch t := PaginationTokenType(s[:1]); t {
|
|
case PaginationTokenTypeStream, PaginationTokenTypeTopology:
|
|
token.Type = t
|
|
positions = strings.Split(s[1:], "_")
|
|
default:
|
|
token.Type = PaginationTokenTypeStream
|
|
positions = strings.Split(s, "_")
|
|
}
|
|
|
|
// Try to get the PDU position.
|
|
if len(positions) >= 1 {
|
|
if pduPos, err := strconv.ParseInt(positions[0], 10, 64); err != nil {
|
|
return nil, err
|
|
} else if pduPos < 0 {
|
|
return nil, errors.New("negative PDU position not allowed")
|
|
} else {
|
|
token.PDUPosition = StreamPosition(pduPos)
|
|
}
|
|
}
|
|
|
|
// Try to get the typing position.
|
|
if len(positions) >= 2 {
|
|
if typPos, err := strconv.ParseInt(positions[1], 10, 64); err != nil {
|
|
return nil, err
|
|
} else if typPos < 0 {
|
|
return nil, errors.New("negative EDU typing position not allowed")
|
|
} else {
|
|
token.EDUTypingPosition = StreamPosition(typPos)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// NewPaginationTokenFromTypeAndPosition takes a PaginationTokenType and a
|
|
// StreamPosition and returns an instance of PaginationToken.
|
|
func NewPaginationTokenFromTypeAndPosition(
|
|
t PaginationTokenType, pdupos StreamPosition, typpos StreamPosition,
|
|
) (p *PaginationToken) {
|
|
return &PaginationToken{
|
|
Type: t,
|
|
PDUPosition: pdupos,
|
|
EDUTypingPosition: typpos,
|
|
}
|
|
}
|
|
|
|
// String translates a PaginationToken to a string of the "xyyyy..." (see
|
|
// NewPaginationToken to know what it represents).
|
|
func (p *PaginationToken) String() string {
|
|
return fmt.Sprintf("%s%d_%d", p.Type, p.PDUPosition, p.EDUTypingPosition)
|
|
}
|
|
|
|
// WithUpdates returns a copy of the PaginationToken with updates applied from another PaginationToken.
|
|
// If the latter PaginationToken contains a field that is not 0, it is considered an update,
|
|
// and its value will replace the corresponding value in the PaginationToken on which WithUpdates is called.
|
|
func (pt *PaginationToken) WithUpdates(other PaginationToken) PaginationToken {
|
|
ret := *pt
|
|
if other.PDUPosition != 0 {
|
|
ret.PDUPosition = other.PDUPosition
|
|
}
|
|
if other.EDUTypingPosition != 0 {
|
|
ret.EDUTypingPosition = other.EDUTypingPosition
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// IsAfter returns whether one PaginationToken refers to states newer than another PaginationToken.
|
|
func (sp *PaginationToken) IsAfter(other PaginationToken) bool {
|
|
return sp.PDUPosition > other.PDUPosition ||
|
|
sp.EDUTypingPosition > other.EDUTypingPosition
|
|
}
|
|
|
|
// PrevEventRef represents a reference to a previous event in a state event upgrade
|
|
type PrevEventRef struct {
|
|
PrevContent json.RawMessage `json:"prev_content"`
|
|
ReplacesState string `json:"replaces_state"`
|
|
PrevSender string `json:"prev_sender"`
|
|
}
|
|
|
|
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
|
type Response struct {
|
|
NextBatch string `json:"next_batch"`
|
|
AccountData struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
} `json:"account_data"`
|
|
Presence struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
} `json:"presence"`
|
|
Rooms struct {
|
|
Join map[string]JoinResponse `json:"join"`
|
|
Invite map[string]InviteResponse `json:"invite"`
|
|
Leave map[string]LeaveResponse `json:"leave"`
|
|
} `json:"rooms"`
|
|
}
|
|
|
|
// NewResponse creates an empty response with initialised maps.
|
|
func NewResponse(token PaginationToken) *Response {
|
|
res := Response{
|
|
NextBatch: token.String(),
|
|
}
|
|
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
|
|
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
|
|
res.Rooms.Join = make(map[string]JoinResponse)
|
|
res.Rooms.Invite = make(map[string]InviteResponse)
|
|
res.Rooms.Leave = make(map[string]LeaveResponse)
|
|
|
|
// Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
|
|
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
|
|
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
|
|
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
|
|
res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
res.Presence.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
|
|
// Fill next_batch with a pagination token. Since this is a response to a sync request, we can assume
|
|
// we'll always return a stream token.
|
|
res.NextBatch = NewPaginationTokenFromTypeAndPosition(
|
|
PaginationTokenTypeStream,
|
|
StreamPosition(token.PDUPosition),
|
|
StreamPosition(token.EDUTypingPosition),
|
|
).String()
|
|
|
|
return &res
|
|
}
|
|
|
|
// IsEmpty returns true if the response is empty, i.e. used to decided whether
|
|
// to return the response immediately to the client or to wait for more data.
|
|
func (r *Response) IsEmpty() bool {
|
|
return len(r.Rooms.Join) == 0 &&
|
|
len(r.Rooms.Invite) == 0 &&
|
|
len(r.Rooms.Leave) == 0 &&
|
|
len(r.AccountData.Events) == 0 &&
|
|
len(r.Presence.Events) == 0
|
|
}
|
|
|
|
// JoinResponse represents a /sync response for a room which is under the 'join' key.
|
|
type JoinResponse struct {
|
|
State struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
} `json:"state"`
|
|
Timeline struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
Limited bool `json:"limited"`
|
|
PrevBatch string `json:"prev_batch"`
|
|
} `json:"timeline"`
|
|
Ephemeral struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
} `json:"ephemeral"`
|
|
AccountData struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
} `json:"account_data"`
|
|
}
|
|
|
|
// NewJoinResponse creates an empty response with initialised arrays.
|
|
func NewJoinResponse() *JoinResponse {
|
|
res := JoinResponse{}
|
|
res.State.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
res.Ephemeral.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
res.AccountData.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
return &res
|
|
}
|
|
|
|
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
|
type InviteResponse struct {
|
|
InviteState struct {
|
|
Events json.RawMessage `json:"events"`
|
|
} `json:"invite_state"`
|
|
}
|
|
|
|
// NewInviteResponse creates an empty response with initialised arrays.
|
|
func NewInviteResponse(event gomatrixserverlib.HeaderedEvent) *InviteResponse {
|
|
res := InviteResponse{}
|
|
res.InviteState.Events = json.RawMessage{'[', ']'}
|
|
if inviteRoomState := gjson.GetBytes(event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
|
|
res.InviteState.Events = json.RawMessage(inviteRoomState.Raw)
|
|
}
|
|
return &res
|
|
}
|
|
|
|
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
|
|
type LeaveResponse struct {
|
|
State struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
} `json:"state"`
|
|
Timeline struct {
|
|
Events []gomatrixserverlib.ClientEvent `json:"events"`
|
|
Limited bool `json:"limited"`
|
|
PrevBatch string `json:"prev_batch"`
|
|
} `json:"timeline"`
|
|
}
|
|
|
|
// NewLeaveResponse creates an empty response with initialised arrays.
|
|
func NewLeaveResponse() *LeaveResponse {
|
|
res := LeaveResponse{}
|
|
res.State.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
res.Timeline.Events = make([]gomatrixserverlib.ClientEvent, 0)
|
|
return &res
|
|
}
|