// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package types

import (
	"encoding/json"
	"fmt"
	"strconv"
	"strings"

	"github.com/matrix-org/dendrite/roomserver/api"
	"github.com/matrix-org/gomatrixserverlib"
	"github.com/tidwall/gjson"
)

var (
	// ErrInvalidSyncTokenType is returned when an attempt at creating a
	// new instance of SyncToken with an invalid type (i.e. neither "s"
	// nor "t").
	ErrInvalidSyncTokenType = fmt.Errorf("Sync token has an unknown prefix (should be either s or t)")
	// ErrInvalidSyncTokenLen is returned when the pagination token is an
	// invalid length
	ErrInvalidSyncTokenLen = fmt.Errorf("Sync token has an invalid length")
)

type StateDelta struct {
	RoomID      string
	StateEvents []*gomatrixserverlib.HeaderedEvent
	Membership  string
	// The PDU stream position of the latest membership event for this user, if applicable.
	// Can be 0 if there is no membership event in this delta.
	MembershipPos StreamPosition
}

// StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64

// LogPosition represents the offset in a Kafka log a client is at.
type LogPosition struct {
	Partition int32
	Offset    int64
}

func (p *LogPosition) IsEmpty() bool {
	return p.Offset == 0
}

// IsAfter returns true if this position is after `lp`.
func (p *LogPosition) IsAfter(lp *LogPosition) bool {
	if lp == nil {
		return false
	}
	if p.Partition != lp.Partition {
		return false
	}
	return p.Offset > lp.Offset
}

// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
type StreamEvent struct {
	*gomatrixserverlib.HeaderedEvent
	StreamPosition  StreamPosition
	TransactionID   *api.TransactionID
	ExcludeFromSync bool
}

// Range represents a range between two stream positions.
type Range struct {
	// From is the position the client has already received.
	From StreamPosition
	// To is the position the client is going towards.
	To StreamPosition
	// True if the client is going backwards
	Backwards bool
}

// Low returns the low number of the range.
// This represents the position the client already has and hence is exclusive.
func (r *Range) Low() StreamPosition {
	if !r.Backwards {
		return r.From
	}
	return r.To
}

// High returns the high number of the range
// This represents the position the client is going towards and hence is inclusive.
func (r *Range) High() StreamPosition {
	if !r.Backwards {
		return r.To
	}
	return r.From
}

// SyncTokenType represents the type of a sync token.
// It can be either "s" (representing a position in the whole stream of events)
// or "t" (representing a position in a room's topology/depth).
type SyncTokenType string

const (
	// SyncTokenTypeStream represents a position in the server's whole
	// stream of events
	SyncTokenTypeStream SyncTokenType = "s"
	// SyncTokenTypeTopology represents a position in a room's topology.
	SyncTokenTypeTopology SyncTokenType = "t"
)

type StreamingToken struct {
	PDUPosition          StreamPosition
	TypingPosition       StreamPosition
	ReceiptPosition      StreamPosition
	SendToDevicePosition StreamPosition
	InvitePosition       StreamPosition
	AccountDataPosition  StreamPosition
	DeviceListPosition   LogPosition
}

// This will be used as a fallback by json.Marshal.
func (s StreamingToken) MarshalText() ([]byte, error) {
	return []byte(s.String()), nil
}

// This will be used as a fallback by json.Unmarshal.
func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
	*s, err = NewStreamTokenFromString(string(text))
	return err
}

func (t StreamingToken) String() string {
	posStr := fmt.Sprintf(
		"s%d_%d_%d_%d_%d_%d",
		t.PDUPosition, t.TypingPosition,
		t.ReceiptPosition, t.SendToDevicePosition,
		t.InvitePosition, t.AccountDataPosition,
	)
	if dl := t.DeviceListPosition; !dl.IsEmpty() {
		posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
	}
	return posStr
}

// IsAfter returns true if ANY position in this token is greater than `other`.
func (t *StreamingToken) IsAfter(other StreamingToken) bool {
	switch {
	case t.PDUPosition > other.PDUPosition:
		return true
	case t.TypingPosition > other.TypingPosition:
		return true
	case t.ReceiptPosition > other.ReceiptPosition:
		return true
	case t.SendToDevicePosition > other.SendToDevicePosition:
		return true
	case t.InvitePosition > other.InvitePosition:
		return true
	case t.AccountDataPosition > other.AccountDataPosition:
		return true
	case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
		return true
	}
	return false
}

func (t *StreamingToken) IsEmpty() bool {
	return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty()
}

// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
// If the latter StreamingToken contains a field that is not 0, it is considered an update,
// and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called.
// If the other token has a log, they will replace any existing log on this token.
func (t *StreamingToken) WithUpdates(other StreamingToken) StreamingToken {
	ret := *t
	ret.ApplyUpdates(other)
	return ret
}

// ApplyUpdates applies any changes from the supplied StreamingToken. If the supplied
// streaming token contains any positions that are not 0, they are considered updates
// and will overwrite the value in the token.
func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
	if other.PDUPosition > t.PDUPosition {
		t.PDUPosition = other.PDUPosition
	}
	if other.TypingPosition > t.TypingPosition {
		t.TypingPosition = other.TypingPosition
	}
	if other.ReceiptPosition > t.ReceiptPosition {
		t.ReceiptPosition = other.ReceiptPosition
	}
	if other.SendToDevicePosition > t.SendToDevicePosition {
		t.SendToDevicePosition = other.SendToDevicePosition
	}
	if other.InvitePosition > t.InvitePosition {
		t.InvitePosition = other.InvitePosition
	}
	if other.AccountDataPosition > t.AccountDataPosition {
		t.AccountDataPosition = other.AccountDataPosition
	}
	if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) {
		t.DeviceListPosition = other.DeviceListPosition
	}
}

type TopologyToken struct {
	Depth       StreamPosition
	PDUPosition StreamPosition
}

// This will be used as a fallback by json.Marshal.
func (t TopologyToken) MarshalText() ([]byte, error) {
	return []byte(t.String()), nil
}

// This will be used as a fallback by json.Unmarshal.
func (t *TopologyToken) UnmarshalText(text []byte) (err error) {
	*t, err = NewTopologyTokenFromString(string(text))
	return err
}

func (t *TopologyToken) StreamToken() StreamingToken {
	return StreamingToken{
		PDUPosition: t.PDUPosition,
	}
}

func (t TopologyToken) String() string {
	return fmt.Sprintf("t%d_%d", t.Depth, t.PDUPosition)
}

// Decrement the topology token to one event earlier.
func (t *TopologyToken) Decrement() {
	depth := t.Depth
	pduPos := t.PDUPosition
	if depth-1 <= 0 {
		// nothing can be lower than this
		depth = 1
	} else {
		// this assumes that we will never have 1000 events all with the same
		// depth. TODO: work out what the right PDU position is to use, probably needs a db hit.
		depth--
		pduPos += 1000
	}
	// The lowest token value is 1, therefore we need to manually set it to that
	// value if we're below it.
	if depth < 1 {
		depth = 1
	}
	t.Depth = depth
	t.PDUPosition = pduPos
}

func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
	if len(tok) < 1 {
		err = fmt.Errorf("empty topology token")
		return
	}
	if tok[0] != SyncTokenTypeTopology[0] {
		err = fmt.Errorf("topology token must start with 't'")
		return
	}
	parts := strings.Split(tok[1:], "_")
	var positions [2]StreamPosition
	for i, p := range parts {
		if i > len(positions) {
			break
		}
		var pos int
		pos, err = strconv.Atoi(p)
		if err != nil {
			return
		}
		positions[i] = StreamPosition(pos)
	}
	token = TopologyToken{
		Depth:       positions[0],
		PDUPosition: positions[1],
	}
	return
}

func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
	if len(tok) < 1 {
		err = fmt.Errorf("empty stream token")
		return
	}
	if tok[0] != SyncTokenTypeStream[0] {
		err = fmt.Errorf("stream token must start with 's'")
		return
	}
	categories := strings.Split(tok[1:], ".")
	parts := strings.Split(categories[0], "_")
	var positions [6]StreamPosition
	for i, p := range parts {
		if i > len(positions) {
			break
		}
		var pos int
		pos, err = strconv.Atoi(p)
		if err != nil {
			return
		}
		positions[i] = StreamPosition(pos)
	}
	token = StreamingToken{
		PDUPosition:          positions[0],
		TypingPosition:       positions[1],
		ReceiptPosition:      positions[2],
		SendToDevicePosition: positions[3],
		InvitePosition:       positions[4],
		AccountDataPosition:  positions[5],
	}
	// dl-0-1234
	// $log_name-$partition-$offset
	for _, logStr := range categories[1:] {
		segments := strings.Split(logStr, "-")
		if len(segments) != 3 {
			err = fmt.Errorf("invalid log position %q", logStr)
			return
		}
		switch segments[0] {
		case "dl":
			// Device list syncing
			var partition, offset int
			if partition, err = strconv.Atoi(segments[1]); err != nil {
				return
			}
			if offset, err = strconv.Atoi(segments[2]); err != nil {
				return
			}
			token.DeviceListPosition.Partition = int32(partition)
			token.DeviceListPosition.Offset = int64(offset)
		default:
			err = fmt.Errorf("unrecognised token type %q", segments[0])
			return
		}
	}
	return token, nil
}

// PrevEventRef represents a reference to a previous event in a state event upgrade
type PrevEventRef struct {
	PrevContent   json.RawMessage `json:"prev_content"`
	ReplacesState string          `json:"replaces_state"`
	PrevSender    string          `json:"prev_sender"`
}

// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
type Response struct {
	NextBatch   StreamingToken `json:"next_batch"`
	AccountData struct {
		Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
	} `json:"account_data"`
	Presence struct {
		Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
	} `json:"presence"`
	Rooms struct {
		Join   map[string]JoinResponse   `json:"join"`
		Peek   map[string]JoinResponse   `json:"peek"`
		Invite map[string]InviteResponse `json:"invite"`
		Leave  map[string]LeaveResponse  `json:"leave"`
	} `json:"rooms"`
	ToDevice struct {
		Events []gomatrixserverlib.SendToDeviceEvent `json:"events"`
	} `json:"to_device"`
	DeviceLists struct {
		Changed []string `json:"changed,omitempty"`
		Left    []string `json:"left,omitempty"`
	} `json:"device_lists"`
	DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
}

// NewResponse creates an empty response with initialised maps.
func NewResponse() *Response {
	res := Response{}
	// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
	// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
	res.Rooms.Join = map[string]JoinResponse{}
	res.Rooms.Peek = map[string]JoinResponse{}
	res.Rooms.Invite = map[string]InviteResponse{}
	res.Rooms.Leave = map[string]LeaveResponse{}

	// Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
	// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
	//       really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
	//       This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
	res.AccountData.Events = []gomatrixserverlib.ClientEvent{}
	res.Presence.Events = []gomatrixserverlib.ClientEvent{}
	res.ToDevice.Events = []gomatrixserverlib.SendToDeviceEvent{}
	res.DeviceListsOTKCount = map[string]int{}

	return &res
}

// IsEmpty returns true if the response is empty, i.e. used to decided whether
// to return the response immediately to the client or to wait for more data.
func (r *Response) IsEmpty() bool {
	return len(r.Rooms.Join) == 0 &&
		len(r.Rooms.Invite) == 0 &&
		len(r.Rooms.Leave) == 0 &&
		len(r.AccountData.Events) == 0 &&
		len(r.Presence.Events) == 0 &&
		len(r.ToDevice.Events) == 0
}

// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type JoinResponse struct {
	State struct {
		Events []gomatrixserverlib.ClientEvent `json:"events"`
	} `json:"state"`
	Timeline struct {
		Events    []gomatrixserverlib.ClientEvent `json:"events"`
		Limited   bool                            `json:"limited"`
		PrevBatch *TopologyToken                  `json:"prev_batch,omitempty"`
	} `json:"timeline"`
	Ephemeral struct {
		Events []gomatrixserverlib.ClientEvent `json:"events"`
	} `json:"ephemeral"`
	AccountData struct {
		Events []gomatrixserverlib.ClientEvent `json:"events"`
	} `json:"account_data"`
}

// NewJoinResponse creates an empty response with initialised arrays.
func NewJoinResponse() *JoinResponse {
	res := JoinResponse{}
	res.State.Events = []gomatrixserverlib.ClientEvent{}
	res.Timeline.Events = []gomatrixserverlib.ClientEvent{}
	res.Ephemeral.Events = []gomatrixserverlib.ClientEvent{}
	res.AccountData.Events = []gomatrixserverlib.ClientEvent{}
	return &res
}

// InviteResponse represents a /sync response for a room which is under the 'invite' key.
type InviteResponse struct {
	InviteState struct {
		Events []json.RawMessage `json:"events"`
	} `json:"invite_state"`
}

// NewInviteResponse creates an empty response with initialised arrays.
func NewInviteResponse(event *gomatrixserverlib.HeaderedEvent) *InviteResponse {
	res := InviteResponse{}
	res.InviteState.Events = []json.RawMessage{}

	// First see if there's invite_room_state in the unsigned key of the invite.
	// If there is then unmarshal it into the response. This will contain the
	// partial room state such as join rules, room name etc.
	if inviteRoomState := gjson.GetBytes(event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
		_ = json.Unmarshal([]byte(inviteRoomState.Raw), &res.InviteState.Events)
	}

	// Then we'll see if we can create a partial of the invite event itself.
	// This is needed for clients to work out *who* sent the invite.
	inviteEvent := gomatrixserverlib.ToClientEvent(event.Unwrap(), gomatrixserverlib.FormatSync)
	inviteEvent.Unsigned = nil
	if ev, err := json.Marshal(inviteEvent); err == nil {
		res.InviteState.Events = append(res.InviteState.Events, ev)
	}

	return &res
}

// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
type LeaveResponse struct {
	State struct {
		Events []gomatrixserverlib.ClientEvent `json:"events"`
	} `json:"state"`
	Timeline struct {
		Events    []gomatrixserverlib.ClientEvent `json:"events"`
		Limited   bool                            `json:"limited"`
		PrevBatch *TopologyToken                  `json:"prev_batch,omitempty"`
	} `json:"timeline"`
}

// NewLeaveResponse creates an empty response with initialised arrays.
func NewLeaveResponse() *LeaveResponse {
	res := LeaveResponse{}
	res.State.Events = []gomatrixserverlib.ClientEvent{}
	res.Timeline.Events = []gomatrixserverlib.ClientEvent{}
	return &res
}

type SendToDeviceEvent struct {
	gomatrixserverlib.SendToDeviceEvent
	ID       StreamPosition
	UserID   string
	DeviceID string
}

type PeekingDevice struct {
	UserID   string
	DeviceID string
}

type Peek struct {
	RoomID  string
	New     bool
	Deleted bool
}