// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package hbasekv

import (
	"fmt"

	"github.com/juju/errors"
	"github.com/ngaut/log"
	"github.com/pingcap/go-hbase"
	"github.com/pingcap/go-themis"
	"github.com/pingcap/tidb/kv"
)

var (
	_ kv.Transaction = (*hbaseTxn)(nil)
)

// dbTxn implements kv.Transacton. It is not thread safe.
type hbaseTxn struct {
	us        kv.UnionStore
	txn       themis.Txn
	store     *hbaseStore // for commit
	storeName string
	tid       uint64
	valid     bool
	version   kv.Version // commit version
	dirty     bool
}

func newHbaseTxn(t themis.Txn, storeName string) *hbaseTxn {
	return &hbaseTxn{
		txn:       t,
		valid:     true,
		storeName: storeName,
		tid:       t.GetStartTS(),
		us:        kv.NewUnionStore(newHbaseSnapshot(t, storeName)),
	}
}

// Implement transaction interface

func (txn *hbaseTxn) Get(k kv.Key) ([]byte, error) {
	log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid)
	return txn.us.Get(k)
}

func (txn *hbaseTxn) Set(k kv.Key, v []byte) error {
	log.Debugf("[kv] set %q txn:%d", k, txn.tid)
	txn.dirty = true
	return txn.us.Set(k, v)
}

func (txn *hbaseTxn) String() string {
	return fmt.Sprintf("%d", txn.tid)
}

func (txn *hbaseTxn) Seek(k kv.Key) (kv.Iterator, error) {
	log.Debugf("[kv] seek %q txn:%d", k, txn.tid)
	return txn.us.Seek(k)
}

func (txn *hbaseTxn) Delete(k kv.Key) error {
	log.Debugf("[kv] delete %q txn:%d", k, txn.tid)
	txn.dirty = true
	return txn.us.Delete(k)
}

func (txn *hbaseTxn) SetOption(opt kv.Option, val interface{}) {
	txn.us.SetOption(opt, val)
}

func (txn *hbaseTxn) DelOption(opt kv.Option) {
	txn.us.DelOption(opt)
}

func (txn *hbaseTxn) doCommit() error {
	if err := txn.us.CheckLazyConditionPairs(); err != nil {
		return errors.Trace(err)
	}

	err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error {
		row := append([]byte(nil), k...)
		if len(v) == 0 { // Deleted marker
			d := hbase.NewDelete(row)
			d.AddStringColumn(hbaseColFamily, hbaseQualifier)
			err := txn.txn.Delete(txn.storeName, d)
			if err != nil {
				return errors.Trace(err)
			}
		} else {
			val := append([]byte(nil), v...)
			p := hbase.NewPut(row)
			p.AddValue(hbaseColFamilyBytes, hbaseQualifierBytes, val)
			txn.txn.Put(txn.storeName, p)
		}
		return nil
	})

	if err != nil {
		return errors.Trace(err)
	}

	err = txn.txn.Commit()
	if err != nil {
		log.Error(err)
		return errors.Trace(err)
	}

	txn.version = kv.NewVersion(txn.txn.GetCommitTS())
	log.Debugf("[kv] commit successfully, txn.version:%d", txn.version.Ver)
	return nil
}

func (txn *hbaseTxn) Commit() error {
	if !txn.valid {
		return kv.ErrInvalidTxn
	}
	log.Debugf("[kv] start to commit txn %d", txn.tid)
	defer func() {
		txn.close()
	}()
	return txn.doCommit()
}

func (txn *hbaseTxn) close() error {
	txn.us.Release()
	txn.valid = false
	return nil
}

//if fail, themis auto rollback
func (txn *hbaseTxn) Rollback() error {
	if !txn.valid {
		return kv.ErrInvalidTxn
	}
	log.Warnf("[kv] Rollback txn %d", txn.tid)
	return txn.close()
}

func (txn *hbaseTxn) LockKeys(keys ...kv.Key) error {
	for _, key := range keys {
		if err := txn.txn.LockRow(txn.storeName, key); err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

func (txn *hbaseTxn) IsReadOnly() bool {
	return !txn.dirty
}

func (txn *hbaseTxn) StartTS() int64 {
	return int64(txn.tid)
}

func (txn *hbaseTxn) GetClient() kv.Client {
	return nil
}

type hbaseClient struct {
}

func (c *hbaseClient) SupportRequestType(reqType, subType int64) bool {
	return false
}

func (c *hbaseClient) Send(req *kv.Request) kv.Response {
	return nil
}