239 lines
5.3 KiB
Go
239 lines
5.3 KiB
Go
|
// Copyright 2015 PingCAP, Inc.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package hbasekv
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"math/rand"
|
||
|
"net/url"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/juju/errors"
|
||
|
"github.com/ngaut/log"
|
||
|
"github.com/pingcap/go-hbase"
|
||
|
"github.com/pingcap/go-themis"
|
||
|
"github.com/pingcap/go-themis/oracle"
|
||
|
"github.com/pingcap/go-themis/oracle/oracles"
|
||
|
"github.com/pingcap/tidb/kv"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// hbaseColFamily is the hbase column family name.
|
||
|
hbaseColFamily = "f"
|
||
|
// hbaseQualifier is the hbase column name.
|
||
|
hbaseQualifier = "q"
|
||
|
// hbaseFmlAndQual is a shortcut.
|
||
|
hbaseFmlAndQual = hbaseColFamily + ":" + hbaseQualifier
|
||
|
// fix length conn pool
|
||
|
hbaseConnPoolSize = 10
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
hbaseColFamilyBytes = []byte(hbaseColFamily)
|
||
|
hbaseQualifierBytes = []byte(hbaseQualifier)
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
_ kv.Storage = (*hbaseStore)(nil)
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// ErrInvalidDSN is returned when store dsn is invalid.
|
||
|
ErrInvalidDSN = errors.New("invalid dsn")
|
||
|
)
|
||
|
|
||
|
type storeCache struct {
|
||
|
mu sync.Mutex
|
||
|
cache map[string]*hbaseStore
|
||
|
}
|
||
|
|
||
|
var mc storeCache
|
||
|
|
||
|
func init() {
|
||
|
mc.cache = make(map[string]*hbaseStore)
|
||
|
rand.Seed(time.Now().UnixNano())
|
||
|
}
|
||
|
|
||
|
type hbaseStore struct {
|
||
|
mu sync.Mutex
|
||
|
uuid string
|
||
|
storeName string
|
||
|
oracle oracle.Oracle
|
||
|
conns []hbase.HBaseClient
|
||
|
}
|
||
|
|
||
|
func (s *hbaseStore) getHBaseClient() hbase.HBaseClient {
|
||
|
// return hbase connection randomly
|
||
|
return s.conns[rand.Intn(hbaseConnPoolSize)]
|
||
|
}
|
||
|
|
||
|
func (s *hbaseStore) Begin() (kv.Transaction, error) {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
hbaseCli := s.getHBaseClient()
|
||
|
t, err := themis.NewTxn(hbaseCli, s.oracle)
|
||
|
if err != nil {
|
||
|
return nil, errors.Trace(err)
|
||
|
}
|
||
|
txn := newHbaseTxn(t, s.storeName)
|
||
|
return txn, nil
|
||
|
}
|
||
|
|
||
|
func (s *hbaseStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) {
|
||
|
hbaseCli := s.getHBaseClient()
|
||
|
t, err := themis.NewTxn(hbaseCli, s.oracle)
|
||
|
if err != nil {
|
||
|
return nil, errors.Trace(err)
|
||
|
}
|
||
|
return newHbaseSnapshot(t, s.storeName), nil
|
||
|
}
|
||
|
|
||
|
func (s *hbaseStore) Close() error {
|
||
|
mc.mu.Lock()
|
||
|
defer mc.mu.Unlock()
|
||
|
|
||
|
delete(mc.cache, s.uuid)
|
||
|
|
||
|
var err error
|
||
|
for _, conn := range s.conns {
|
||
|
err = conn.Close()
|
||
|
if err != nil {
|
||
|
log.Error(err)
|
||
|
}
|
||
|
}
|
||
|
// return last error
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (s *hbaseStore) UUID() string {
|
||
|
return s.uuid
|
||
|
}
|
||
|
|
||
|
func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
|
||
|
hbaseCli := s.getHBaseClient()
|
||
|
t, err := themis.NewTxn(hbaseCli, s.oracle)
|
||
|
if err != nil {
|
||
|
return kv.Version{Ver: 0}, errors.Trace(err)
|
||
|
}
|
||
|
defer t.Release()
|
||
|
|
||
|
return kv.Version{Ver: t.GetStartTS()}, nil
|
||
|
}
|
||
|
|
||
|
// Driver implements engine Driver.
|
||
|
type Driver struct {
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
tsoTypeLocal = "local"
|
||
|
tsoTypeZK = "zk"
|
||
|
|
||
|
tsoZKPath = "/zk/tso"
|
||
|
)
|
||
|
|
||
|
// Open opens or creates an HBase storage with given path.
|
||
|
//
|
||
|
// The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=local|zk]'.
|
||
|
// If tso is not provided, it will use a local oracle instead. (for test only)
|
||
|
func (d Driver) Open(path string) (kv.Storage, error) {
|
||
|
mc.mu.Lock()
|
||
|
defer mc.mu.Unlock()
|
||
|
|
||
|
zks, tso, tableName, err := parsePath(path)
|
||
|
if err != nil {
|
||
|
return nil, errors.Trace(err)
|
||
|
}
|
||
|
if tso != tsoTypeLocal && tso != tsoTypeZK {
|
||
|
return nil, errors.Trace(ErrInvalidDSN)
|
||
|
}
|
||
|
|
||
|
uuid := fmt.Sprintf("hbase-%v-%v", zks, tableName)
|
||
|
if tso == tsoTypeLocal {
|
||
|
log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid)
|
||
|
}
|
||
|
if store, ok := mc.cache[uuid]; ok {
|
||
|
return store, nil
|
||
|
}
|
||
|
|
||
|
// create buffered HBase connections, HBaseClient is goroutine-safe, so
|
||
|
// it's OK to redistribute to transactions.
|
||
|
conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize)
|
||
|
for i := 0; i < hbaseConnPoolSize; i++ {
|
||
|
var c hbase.HBaseClient
|
||
|
c, err = hbase.NewClient(strings.Split(zks, ","), "/hbase")
|
||
|
if err != nil {
|
||
|
return nil, errors.Trace(err)
|
||
|
}
|
||
|
conns = append(conns, c)
|
||
|
}
|
||
|
|
||
|
c := conns[0]
|
||
|
var b bool
|
||
|
b, err = c.TableExists(tableName)
|
||
|
if err != nil {
|
||
|
return nil, errors.Trace(err)
|
||
|
}
|
||
|
if !b {
|
||
|
// Create new hbase table for store.
|
||
|
t := hbase.NewTableDesciptor(tableName)
|
||
|
cf := hbase.NewColumnFamilyDescriptor(hbaseColFamily)
|
||
|
cf.AddAttr("THEMIS_ENABLE", "true")
|
||
|
t.AddColumnDesc(cf)
|
||
|
//TODO: specify split?
|
||
|
if err := c.CreateTable(t, nil); err != nil {
|
||
|
return nil, errors.Trace(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var ora oracle.Oracle
|
||
|
switch tso {
|
||
|
case tsoTypeLocal:
|
||
|
ora = oracles.NewLocalOracle()
|
||
|
case tsoTypeZK:
|
||
|
ora = oracles.NewRemoteOracle(zks, tsoZKPath)
|
||
|
}
|
||
|
|
||
|
s := &hbaseStore{
|
||
|
uuid: uuid,
|
||
|
storeName: tableName,
|
||
|
oracle: ora,
|
||
|
conns: conns,
|
||
|
}
|
||
|
mc.cache[uuid] = s
|
||
|
return s, nil
|
||
|
}
|
||
|
|
||
|
func parsePath(path string) (zks, tso, tableName string, err error) {
|
||
|
u, err := url.Parse(path)
|
||
|
if err != nil {
|
||
|
return "", "", "", errors.Trace(err)
|
||
|
}
|
||
|
if strings.ToLower(u.Scheme) != "hbase" {
|
||
|
return "", "", "", errors.Trace(ErrInvalidDSN)
|
||
|
}
|
||
|
p, tableName := filepath.Split(u.Path)
|
||
|
if p != "/" {
|
||
|
return "", "", "", errors.Trace(ErrInvalidDSN)
|
||
|
}
|
||
|
zks = u.Host
|
||
|
tso = u.Query().Get("tso")
|
||
|
if tso == "" {
|
||
|
tso = tsoTypeLocal
|
||
|
}
|
||
|
return zks, tso, tableName, nil
|
||
|
}
|