|
- // Copyright 2015 PingCAP, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // See the License for the specific language governing permissions and
- // limitations under the License.
-
- package domain
-
- import (
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/juju/errors"
- "github.com/ngaut/log"
- "github.com/pingcap/tidb/ddl"
- "github.com/pingcap/tidb/infoschema"
- "github.com/pingcap/tidb/kv"
- "github.com/pingcap/tidb/meta"
- "github.com/pingcap/tidb/model"
- "github.com/pingcap/tidb/sessionctx/variable"
- "github.com/pingcap/tidb/store/localstore"
- "github.com/pingcap/tidb/terror"
- )
-
- var ddlLastReloadSchemaTS = "ddl_last_reload_schema_ts"
-
- // Domain represents a storage space. Different domains can use the same database name.
- // Multiple domains can be used in parallel without synchronization.
- type Domain struct {
- store kv.Storage
- infoHandle *infoschema.Handle
- ddl ddl.DDL
- leaseCh chan time.Duration
- // nano seconds
- lastLeaseTS int64
- m sync.Mutex
- }
-
- func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
- m := meta.NewMeta(txn)
- schemaMetaVersion, err := m.GetSchemaVersion()
- if err != nil {
- return errors.Trace(err)
- }
-
- info := do.infoHandle.Get()
- if info != nil && schemaMetaVersion <= info.SchemaMetaVersion() {
- // info may be changed by other txn, so here its version may be bigger than schema version,
- // so we don't need to reload.
- log.Debugf("[ddl] schema version is still %d, no need reload", schemaMetaVersion)
- return nil
- }
-
- schemas, err := m.ListDatabases()
- if err != nil {
- return errors.Trace(err)
- }
-
- for _, di := range schemas {
- if di.State != model.StatePublic {
- // schema is not public, can't be used outside.
- continue
- }
-
- tables, err1 := m.ListTables(di.ID)
- if err1 != nil {
- return errors.Trace(err1)
- }
-
- di.Tables = make([]*model.TableInfo, 0, len(tables))
- for _, tbl := range tables {
- if tbl.State != model.StatePublic {
- // schema is not public, can't be used outsiee.
- continue
- }
- di.Tables = append(di.Tables, tbl)
- }
- }
-
- log.Infof("[ddl] loadInfoSchema %d", schemaMetaVersion)
- err = do.infoHandle.Set(schemas, schemaMetaVersion)
- return errors.Trace(err)
- }
-
- // InfoSchema gets information schema from domain.
- func (do *Domain) InfoSchema() infoschema.InfoSchema {
- // try reload if possible.
- do.tryReload()
- return do.infoHandle.Get()
- }
-
- // DDL gets DDL from domain.
- func (do *Domain) DDL() ddl.DDL {
- return do.ddl
- }
-
- // Store gets KV store from domain.
- func (do *Domain) Store() kv.Storage {
- return do.store
- }
-
- // SetLease will reset the lease time for online DDL change.
- func (do *Domain) SetLease(lease time.Duration) {
- do.leaseCh <- lease
-
- // let ddl to reset lease too.
- do.ddl.SetLease(lease)
- }
-
- // Stats returns the domain statistic.
- func (do *Domain) Stats() (map[string]interface{}, error) {
- m := make(map[string]interface{})
- m[ddlLastReloadSchemaTS] = atomic.LoadInt64(&do.lastLeaseTS) / 1e9
-
- return m, nil
- }
-
- // GetScope gets the status variables scope.
- func (do *Domain) GetScope(status string) variable.ScopeFlag {
- // Now domain status variables scope are all default scope.
- return variable.DefaultScopeFlag
- }
-
- func (do *Domain) tryReload() {
- // if we don't have update the schema for a long time > lease, we must force reloading it.
- // Although we try to reload schema every lease time in a goroutine, sometimes it may not
- // run accurately, e.g, the machine has a very high load, running the ticker is delayed.
- last := atomic.LoadInt64(&do.lastLeaseTS)
- lease := do.ddl.GetLease()
-
- // if lease is 0, we use the local store, so no need to reload.
- if lease > 0 && time.Now().UnixNano()-last > lease.Nanoseconds() {
- do.mustReload()
- }
- }
-
- const minReloadTimeout = 20 * time.Second
-
- func (do *Domain) reload() error {
- // lock here for only once at same time.
- do.m.Lock()
- defer do.m.Unlock()
-
- timeout := do.ddl.GetLease() / 2
- if timeout < minReloadTimeout {
- timeout = minReloadTimeout
- }
-
- done := make(chan error, 1)
- go func() {
- var err error
-
- for {
- err = kv.RunInNewTxn(do.store, false, do.loadInfoSchema)
- // if err is db closed, we will return it directly, otherwise, we will
- // check reloading again.
- if terror.ErrorEqual(err, localstore.ErrDBClosed) {
- break
- }
-
- if err != nil {
- log.Errorf("[ddl] load schema err %v, retry again", errors.ErrorStack(err))
- // TODO: use a backoff algorithm.
- time.Sleep(500 * time.Millisecond)
- continue
- }
-
- atomic.StoreInt64(&do.lastLeaseTS, time.Now().UnixNano())
- break
- }
-
- done <- err
- }()
-
- select {
- case err := <-done:
- return errors.Trace(err)
- case <-time.After(timeout):
- return errors.New("reload schema timeout")
- }
- }
-
- func (do *Domain) mustReload() {
- // if reload error, we will terminate whole program to guarantee data safe.
- err := do.reload()
- if err != nil {
- log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err))
- }
- }
-
- // check schema every 300 seconds default.
- const defaultLoadTime = 300 * time.Second
-
- func (do *Domain) loadSchemaInLoop(lease time.Duration) {
- if lease <= 0 {
- lease = defaultLoadTime
- }
-
- ticker := time.NewTicker(lease)
- defer ticker.Stop()
-
- for {
- select {
- case <-ticker.C:
- err := do.reload()
- // we may close store in test, but the domain load schema loop is still checking,
- // so we can't panic for ErrDBClosed and just return here.
- if terror.ErrorEqual(err, localstore.ErrDBClosed) {
- return
- } else if err != nil {
- log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err))
- }
- case newLease := <-do.leaseCh:
- if newLease <= 0 {
- newLease = defaultLoadTime
- }
-
- if lease == newLease {
- // nothing to do
- continue
- }
-
- lease = newLease
- // reset ticker too.
- ticker.Stop()
- ticker = time.NewTicker(lease)
- }
- }
- }
-
- type ddlCallback struct {
- ddl.BaseCallback
- do *Domain
- }
-
- func (c *ddlCallback) OnChanged(err error) error {
- if err != nil {
- return err
- }
- log.Warnf("[ddl] on DDL change")
-
- c.do.mustReload()
- return nil
- }
-
- // NewDomain creates a new domain.
- func NewDomain(store kv.Storage, lease time.Duration) (d *Domain, err error) {
- d = &Domain{
- store: store,
- leaseCh: make(chan time.Duration, 1),
- }
-
- d.infoHandle = infoschema.NewHandle(d.store)
- d.ddl = ddl.NewDDL(d.store, d.infoHandle, &ddlCallback{do: d}, lease)
- d.mustReload()
-
- variable.RegisterStatistics(d)
-
- go d.loadSchemaInLoop(lease)
-
- return d, nil
- }
|