|
- package client
-
- import (
- "container/list"
- "time"
-
- "github.com/juju/errors"
- "github.com/ngaut/log"
- "github.com/ngaut/tso/proto"
- "github.com/ngaut/tso/util"
- "github.com/ngaut/zkhelper"
- )
-
- const (
- maxPipelineRequest = 100000
- )
-
- // Client is a timestamp oracle client.
- type Client struct {
- requests chan *PipelineRequest
-
- pending *list.List
- conf *Conf
-
- addr string
-
- leaderCh chan string
- }
-
- // Conf is the configuration.
- type Conf struct {
- // tso server address, it will be deprecated later.
- ServerAddr string
-
- // ZKAddr is for zookeeper address, if set, client will ignore ServerAddr
- // and find the leader tso server address in zookeeper.
- // Later ServerAddr is just for simple test and backward compatibility.
- ZKAddr string
-
- // root path is the tso server saving in zookeeper, like /zk/tso.
- RootPath string
- }
-
- // PipelineRequest let you get the timestamp with pipeline.
- type PipelineRequest struct {
- done chan error
- reply *proto.Response
- }
-
- func newPipelineRequest() *PipelineRequest {
- return &PipelineRequest{
- done: make(chan error, 1),
- }
- }
-
- // MarkDone sets the repsone for current request.
- func (pr *PipelineRequest) MarkDone(reply *proto.Response, err error) {
- if err != nil {
- pr.reply = nil
- }
- pr.reply = reply
- pr.done <- errors.Trace(err)
- }
-
- // GetTS gets the timestamp.
- func (pr *PipelineRequest) GetTS() (*proto.Timestamp, error) {
- err := <-pr.done
- if err != nil {
- return nil, errors.Trace(err)
- }
-
- return &pr.reply.Timestamp, nil
- }
-
- // NewClient creates a timestamp oracle client.
- func NewClient(conf *Conf) *Client {
- c := &Client{
- requests: make(chan *PipelineRequest, maxPipelineRequest),
- pending: list.New(),
- conf: conf,
- leaderCh: make(chan string, 1),
- }
-
- if len(conf.ZKAddr) == 0 {
- c.leaderCh <- conf.ServerAddr
- } else {
- go c.watchLeader()
- }
-
- go c.workerLoop()
-
- return c
- }
-
- func (c *Client) cleanupPending(err error) {
- log.Warn(err)
- length := c.pending.Len()
- for i := 0; i < length; i++ {
- e := c.pending.Front()
- c.pending.Remove(e)
- e.Value.(*PipelineRequest).MarkDone(nil, err)
- }
-
- // clear request in channel too
- length = len(c.requests)
- for i := 0; i < length; i++ {
- req := <-c.requests
- req.MarkDone(nil, err)
- }
- }
-
- func (c *Client) notifyOne(reply *proto.Response) {
- e := c.pending.Front()
- c.pending.Remove(e)
- req := e.Value.(*PipelineRequest)
- req.MarkDone(reply, nil)
- }
-
- func (c *Client) writeRequests(session *Conn) error {
- var protoHdr [1]byte
- for i := 0; i < c.pending.Len(); i++ {
- session.Write(protoHdr[:])
- }
- return session.Flush()
- }
-
- func (c *Client) handleResponse(session *Conn) error {
- length := c.pending.Len()
- for i := 0; i < length; i++ {
- var resp proto.Response
- err := resp.Decode(session)
- if err != nil {
- return errors.Trace(err)
- }
- c.notifyOne(&resp)
- }
-
- return nil
- }
-
- func (c *Client) do() error {
- session, err := NewConnection(c.addr, time.Duration(1*time.Second))
- if err != nil {
- return errors.Trace(err)
- }
-
- log.Debugf("connect tso server %s ok", c.addr)
-
- defer session.Close()
- for {
- select {
- case req := <-c.requests:
- c.pending.PushBack(req)
- length := len(c.requests)
- for i := 0; i < length; i++ {
- req = <-c.requests
- c.pending.PushBack(req)
- }
-
- err = c.writeRequests(session)
- if err != nil {
- return errors.Trace(err)
- }
- err = c.handleResponse(session)
- if err != nil {
- return errors.Trace(err)
- }
- case addr := <-c.leaderCh:
- oldAddr := c.addr
- c.addr = addr
- return errors.Errorf("leader change %s -> %s", oldAddr, addr)
- }
- }
- }
-
- func (c *Client) workerLoop() {
- // first get tso leader
- c.addr = <-c.leaderCh
- log.Debugf("try to connect tso server %s", c.addr)
-
- for {
- err := c.do()
- if err != nil {
- c.cleanupPending(err)
- }
- select {
- case <-time.After(1 * time.Second):
- case addr := <-c.leaderCh:
- // If old tso server down, NewConnection will fail and return immediately in do function,
- // so we must check leader change here.
- log.Warnf("leader change %s -> %s", c.addr, addr)
- c.addr = addr
- // Wait some time to let tso server allow accepting connections.
- time.Sleep(1 * time.Second)
- }
- }
- }
-
- func (c *Client) watchLeader() {
- var (
- conn zkhelper.Conn
- err error
- )
-
- for {
- conn, err = zkhelper.ConnectToZkWithTimeout(c.conf.ZKAddr, time.Second)
- if err != nil {
- log.Errorf("connect zk err %v, retry later", err)
- time.Sleep(3 * time.Second)
- continue
- }
- break
- }
-
- defer conn.Close()
-
- var lastAddr string
-
- for {
- addr, watcher, err := util.GetWatchLeader(conn, c.conf.RootPath)
- if err != nil {
- log.Errorf("get tso leader err %v, retry later", err)
- time.Sleep(3 * time.Second)
- continue
- }
-
- if lastAddr != addr {
- log.Warnf("leader change %s -> %s", lastAddr, addr)
- lastAddr = addr
- c.leaderCh <- addr
- }
-
- // watch the leader changes.
- <-watcher
- }
- }
-
- // GoGetTimestamp returns a PipelineRequest so you can get the timestamp later.
- func (c *Client) GoGetTimestamp() *PipelineRequest {
- pr := newPipelineRequest()
- c.requests <- pr
- return pr
- }
|