|
- package hbase
-
- import (
- "bytes"
-
- pb "github.com/golang/protobuf/proto"
- "github.com/juju/errors"
- "github.com/ngaut/log"
- "github.com/pingcap/go-hbase/proto"
- )
-
- // nextKey returns the next key in byte-order.
- // for example:
- // nil -> [0]
- // [] -> [0]
- // [0] -> [1]
- // [1, 2, 3] -> [1, 2, 4]
- // [1, 255] -> [2, 0]
- // [255] -> [0, 0]
- func nextKey(data []byte) []byte {
- // nil or []byte{}
- dataLen := len(data)
- if dataLen == 0 {
- return []byte{0}
- }
-
- // Check and process carry bit.
- i := dataLen - 1
- data[i]++
- for i > 0 {
- if data[i] == 0 {
- i--
- data[i]++
- } else {
- break
- }
- }
-
- // Check whether need to add another byte for carry bit,
- // like [255] -> [0, 0]
- if data[i] == 0 {
- data = append([]byte{0}, data...)
- }
-
- return data
- }
-
- const (
- defaultScanMaxRetries = 3
- )
-
- type Scan struct {
- client *client
- id uint64
- table []byte
- // row key
- StartRow []byte
- StopRow []byte
- families [][]byte
- qualifiers [][][]byte
- nextStartKey []byte
- numCached int
- closed bool
- location *RegionInfo
- server *connection
- cache []*ResultRow
- attrs map[string][]byte
- MaxVersions uint32
- TsRangeFrom uint64
- TsRangeTo uint64
- lastResult *ResultRow
- // if region split, set startKey = lastResult.Row, but must skip the first
- skipFirst bool
- maxRetries int
- }
-
- func NewScan(table []byte, batchSize int, c HBaseClient) *Scan {
- if batchSize <= 0 {
- batchSize = 100
- }
- return &Scan{
- client: c.(*client),
- table: table,
- nextStartKey: nil,
- families: make([][]byte, 0),
- qualifiers: make([][][]byte, 0),
- numCached: batchSize,
- closed: false,
- attrs: make(map[string][]byte),
- maxRetries: defaultScanMaxRetries,
- }
- }
-
- func (s *Scan) Close() error {
- if s.closed {
- return nil
- }
-
- err := s.closeScan(s.server, s.location, s.id)
- if err != nil {
- return errors.Trace(err)
- }
-
- s.closed = true
- return nil
- }
-
- func (s *Scan) AddColumn(family, qual []byte) {
- s.AddFamily(family)
- pos := s.posOfFamily(family)
- s.qualifiers[pos] = append(s.qualifiers[pos], qual)
- }
-
- func (s *Scan) AddStringColumn(family, qual string) {
- s.AddColumn([]byte(family), []byte(qual))
- }
-
- func (s *Scan) AddFamily(family []byte) {
- pos := s.posOfFamily(family)
- if pos == -1 {
- s.families = append(s.families, family)
- s.qualifiers = append(s.qualifiers, make([][]byte, 0))
- }
- }
-
- func (s *Scan) AddStringFamily(family string) {
- s.AddFamily([]byte(family))
- }
-
- func (s *Scan) posOfFamily(family []byte) int {
- for p, v := range s.families {
- if bytes.Equal(family, v) {
- return p
- }
- }
- return -1
- }
-
- func (s *Scan) AddAttr(name string, val []byte) {
- s.attrs[name] = val
- }
-
- func (s *Scan) AddTimeRange(from uint64, to uint64) {
- s.TsRangeFrom = from
- s.TsRangeTo = to
- }
-
- func (s *Scan) Closed() bool {
- return s.closed
- }
-
- func (s *Scan) CreateGetFromScan(row []byte) *Get {
- g := NewGet(row)
- for i, family := range s.families {
- if len(s.qualifiers[i]) > 0 {
- for _, qual := range s.qualifiers[i] {
- g.AddColumn(family, qual)
- }
- } else {
- g.AddFamily(family)
- }
- }
- return g
- }
-
- func (s *Scan) getData(startKey []byte, retries int) ([]*ResultRow, error) {
- server, location, err := s.getServerAndLocation(s.table, startKey)
- if err != nil {
- return nil, errors.Trace(err)
- }
-
- req := &proto.ScanRequest{
- Region: &proto.RegionSpecifier{
- Type: proto.RegionSpecifier_REGION_NAME.Enum(),
- Value: []byte(location.Name),
- },
- NumberOfRows: pb.Uint32(uint32(s.numCached)),
- Scan: &proto.Scan{},
- }
-
- // set attributes
- var attrs []*proto.NameBytesPair
- for k, v := range s.attrs {
- p := &proto.NameBytesPair{
- Name: pb.String(k),
- Value: v,
- }
- attrs = append(attrs, p)
- }
- if len(attrs) > 0 {
- req.Scan.Attribute = attrs
- }
-
- if s.id > 0 {
- req.ScannerId = pb.Uint64(s.id)
- }
- req.Scan.StartRow = startKey
- if s.StopRow != nil {
- req.Scan.StopRow = s.StopRow
- }
- if s.MaxVersions > 0 {
- req.Scan.MaxVersions = &s.MaxVersions
- }
- if s.TsRangeTo > s.TsRangeFrom {
- req.Scan.TimeRange = &proto.TimeRange{
- From: pb.Uint64(s.TsRangeFrom),
- To: pb.Uint64(s.TsRangeTo),
- }
- }
-
- for i, v := range s.families {
- req.Scan.Column = append(req.Scan.Column, &proto.Column{
- Family: v,
- Qualifier: s.qualifiers[i],
- })
- }
-
- cl := newCall(req)
- err = server.call(cl)
- if err != nil {
- return nil, errors.Trace(err)
- }
-
- msg := <-cl.responseCh
- rs, err := s.processResponse(msg)
- if err != nil && (isNotInRegionError(err) || isUnknownScannerError(err)) {
- if retries <= s.maxRetries {
- // clean this table region cache and try again
- s.client.CleanRegionCache(s.table)
- // create new scanner and set startRow to lastResult
- s.id = 0
- if s.lastResult != nil {
- startKey = s.lastResult.Row
- s.skipFirst = true
- }
- s.server = nil
- s.location = nil
- log.Warnf("Retryint get data for %d time(s)", retries+1)
- retrySleep(retries + 1)
- return s.getData(startKey, retries+1)
- }
- }
- return rs, nil
- }
-
- func (s *Scan) processResponse(response pb.Message) ([]*ResultRow, error) {
- var res *proto.ScanResponse
- switch r := response.(type) {
- case *proto.ScanResponse:
- res = r
- case *exception:
- return nil, errors.New(r.msg)
- default:
- return nil, errors.Errorf("Invalid response seen [response: %#v]", response)
- }
-
- // Check whether response is nil.
- if res == nil {
- return nil, errors.Errorf("Empty response: [table=%s] [StartRow=%q] [StopRow=%q] ", s.table, s.StartRow, s.StopRow)
- }
-
- nextRegion := true
- s.nextStartKey = nil
- s.id = res.GetScannerId()
-
- results := res.GetResults()
- n := len(results)
-
- if (n == s.numCached) ||
- len(s.location.EndKey) == 0 ||
- (s.StopRow != nil && bytes.Compare(s.location.EndKey, s.StopRow) > 0 && n < s.numCached) ||
- res.GetMoreResultsInRegion() {
- nextRegion = false
- }
-
- var err error
- if nextRegion {
- s.nextStartKey = s.location.EndKey
- err = s.closeScan(s.server, s.location, s.id)
- if err != nil {
- return nil, errors.Trace(err)
- }
- s.server = nil
- s.location = nil
- s.id = 0
- }
-
- if n == 0 && !nextRegion {
- err = s.Close()
- if err != nil {
- return nil, errors.Trace(err)
- }
- }
-
- if s.skipFirst {
- results = results[1:]
- s.skipFirst = false
- n = len(results)
- }
-
- tbr := make([]*ResultRow, n)
- for i, v := range results {
- if v != nil {
- tbr[i] = NewResultRow(v)
- }
- }
-
- return tbr, nil
- }
-
- func (s *Scan) nextBatch() int {
- startKey := s.nextStartKey
- if startKey == nil {
- startKey = s.StartRow
- }
-
- // Notice: ignore error here.
- // TODO: add error check, now only add a log.
- rs, err := s.getData(startKey, 0)
- if err != nil {
- log.Errorf("scan next batch failed - [startKey=%q], %v", startKey, errors.ErrorStack(err))
- }
-
- // Current region get 0 data, try switch to next region.
- if len(rs) == 0 && len(s.nextStartKey) > 0 {
- // TODO: add error check, now only add a log.
- rs, err = s.getData(s.nextStartKey, 0)
- if err != nil {
- log.Errorf("scan next batch failed - [startKey=%q], %v", s.nextStartKey, errors.ErrorStack(err))
- }
- }
-
- s.cache = rs
- return len(s.cache)
- }
-
- func (s *Scan) Next() *ResultRow {
- if s.closed {
- return nil
- }
- var ret *ResultRow
- if len(s.cache) == 0 {
- n := s.nextBatch()
- // no data returned
- if n == 0 {
- return nil
- }
- }
-
- ret = s.cache[0]
- s.lastResult = ret
- s.cache = s.cache[1:]
- return ret
- }
-
- func (s *Scan) closeScan(server *connection, location *RegionInfo, id uint64) error {
- if server == nil || location == nil {
- return nil
- }
-
- req := &proto.ScanRequest{
- Region: &proto.RegionSpecifier{
- Type: proto.RegionSpecifier_REGION_NAME.Enum(),
- Value: []byte(location.Name),
- },
- ScannerId: pb.Uint64(id),
- CloseScanner: pb.Bool(true),
- }
-
- cl := newCall(req)
- err := server.call(cl)
- if err != nil {
- return errors.Trace(err)
- }
-
- // TODO: add exception check.
- <-cl.responseCh
- return nil
- }
-
- func (s *Scan) getServerAndLocation(table, startRow []byte) (*connection, *RegionInfo, error) {
- if s.server != nil && s.location != nil {
- return s.server, s.location, nil
- }
-
- var err error
- s.location, err = s.client.LocateRegion(table, startRow, true)
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
-
- s.server, err = s.client.getClientConn(s.location.Server)
- if err != nil {
- return nil, nil, errors.Trace(err)
- }
- return s.server, s.location, nil
- }
|