Browse Source

增加锁注册服务信息机制

pull/29/head
Sydonian 2 years ago
parent
commit
763bf968ff
10 changed files with 496 additions and 416 deletions
  1. +27
    -5
      pkgs/distlock/internal/acquire_actor.go
  2. +9
    -2
      pkgs/distlock/internal/lease_actor.go
  3. +30
    -6
      pkgs/distlock/internal/models.go
  4. +85
    -94
      pkgs/distlock/internal/providers_actor.go
  5. +30
    -5
      pkgs/distlock/internal/release_actor.go
  6. +136
    -0
      pkgs/distlock/internal/service_info_actor.go
  7. +0
    -116
      pkgs/distlock/internal/utils.go
  8. +0
    -61
      pkgs/distlock/internal/utils_test.go
  9. +102
    -62
      pkgs/distlock/internal/watch_etcd_actor.go
  10. +77
    -65
      pkgs/distlock/service.go

+ 27
- 5
pkgs/distlock/internal/acquire_actor.go View File

@@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
@@ -28,6 +29,7 @@ type AcquireActor struct {
etcdCli *clientv3.Client
providersActor *ProvidersActor

serviceID string
acquirings []*acquireInfo
lock sync.Mutex
}
@@ -98,6 +100,21 @@ func (a *AcquireActor) TryAcquireNow() {
}()
}

func (a *AcquireActor) ResetState(serviceID string) {
a.lock.Lock()
defer a.lock.Unlock()

a.serviceID = serviceID
for _, info := range a.acquirings {
if info.LastErr != nil {
info.Callback.SetError(info.LastErr)
} else {
info.Callback.SetError(ErrAcquiringTimeout)
}
}
a.acquirings = nil
}

func (a *AcquireActor) doAcquiring() error {
ctx := context.Background()

@@ -126,24 +143,29 @@ func (a *AcquireActor) doAcquiring() error {

// TODO 可以考虑一次性获得多个锁
for i := 0; i < len(a.acquirings); i++ {
req := a.acquirings[i]

// 测试锁,并获得锁数据
reqData, err := a.providersActor.TestLockRequestAndMakeData(a.acquirings[i].Request)
reqData, err := a.providersActor.TestLockRequestAndMakeData(req.Request)
if err != nil {
a.acquirings[i].LastErr = err
req.LastErr = err
continue
}

nextIndexStr := strconv.FormatInt(index+1, 10)
reqData.ID = nextIndexStr
reqData.SerivceID = a.serviceID
reqData.Reason = req.Request.Reason
reqData.Timestamp = time.Now().Unix()

// 锁成功,提交锁数据
err = a.submitLockRequest(ctx, nextIndexStr, reqData)
if err != nil {
a.acquirings[i].LastErr = err
req.LastErr = err
continue
}

a.acquirings[i].Callback.SetValue(reqData.ID)
req.Callback.SetValue(reqData.ID)
a.acquirings = mylo.RemoveAt(a.acquirings, i)
break
}
@@ -159,7 +181,7 @@ func (a *AcquireActor) submitLockRequest(ctx context.Context, index string, reqD

etcdOps := []clientv3.Op{
clientv3.OpPut(EtcdLockRequestIndex, index),
clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)),
clientv3.OpPut(MakeEtcdLockRequestKey(reqData.ID), string(reqBytes)),
}
txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit()
if err != nil {


+ 9
- 2
pkgs/distlock/internal/lease_actor.go View File

@@ -35,14 +35,14 @@ func (a *LeaseActor) Init(releaseActor *ReleaseActor) {
a.releaseActor = releaseActor
}

func (a *LeaseActor) StartChecking() error {
func (a *LeaseActor) Start() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.ticker = time.NewTicker(time.Second)
return nil
})
}

func (a *LeaseActor) StopChecking() error {
func (a *LeaseActor) Stop() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
if a.ticker != nil {
a.ticker.Stop()
@@ -91,6 +91,13 @@ func (a *LeaseActor) Remove(reqID string) error {
})
}

func (a *LeaseActor) ResetState() {
actor.Wait(context.Background(), a.commandChan, func() error {
a.leases = make(map[string]*lockRequestLease)
return nil
})
}

func (a *LeaseActor) Serve() error {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()


+ 30
- 6
pkgs/distlock/internal/models.go View File

@@ -1,9 +1,13 @@
package internal

import "strings"

const (
EtcdLockRequestData = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
EtcdLockRequestDataPrefix = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
EtcdServiceInfoPrefix = "/distlock/services"
EtcdWatchPrefix = "/distlock"
)

type Lock struct {
@@ -13,7 +17,8 @@ type Lock struct {
}

type LockRequest struct {
Locks []Lock
Reason string
Locks []Lock
}

func (b *LockRequest) Add(lock Lock) {
@@ -47,6 +52,25 @@ type lockData struct {
}

type LockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
ID string `json:"id"`
SerivceID string `json:"serviceID"`
Reason string `json:"reason"`
Timestamp int64 `json:"timestamp"`
Locks []lockData `json:"locks"`
}

func MakeEtcdLockRequestKey(reqID string) string {
return EtcdLockRequestDataPrefix + "/" + reqID
}

func GetLockRequestID(key string) string {
return strings.TrimPrefix(key, EtcdLockRequestDataPrefix+"/")
}

func MakeServiceInfoKey(svcID string) string {
return EtcdServiceInfoPrefix + "/" + svcID
}

type ServiceInfo struct {
ID string `json:"id"`
}

+ 85
- 94
pkgs/distlock/internal/providers_actor.go View File

@@ -2,17 +2,20 @@ package internal

import (
"context"
"errors"
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/trie"
)

var ErrWaitIndexUpdateTimeout = errors.New("waitting local index updating timeout")

type indexWaiter struct {
Index int64
Future *future.SetVoidFuture
Index int64
Callback *future.SetVoidFuture
}

type ProvidersActor struct {
@@ -21,14 +24,11 @@ type ProvidersActor struct {
allProviders []LockProvider

indexWaiters []indexWaiter

commandChan *actor.CommandChannel
lock sync.Mutex
}

func NewProvidersActor() *ProvidersActor {
return &ProvidersActor{
commandChan: actor.NewCommandChannel(),
}
return &ProvidersActor{}
}

func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) {
@@ -42,46 +42,46 @@ func (a *ProvidersActor) Init() {
func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) error {
fut := future.NewSetVoid()

a.commandChan.Send(func() {
if index <= a.localLockReqIndex {
fut.SetVoid()
} else {
a.indexWaiters = append(a.indexWaiters, indexWaiter{
Index: index,
Future: fut,
})
}
})
a.lock.Lock()
if index <= a.localLockReqIndex {
fut.SetVoid()
} else {
a.indexWaiters = append(a.indexWaiters, indexWaiter{
Index: index,
Callback: fut,
})
}
a.lock.Unlock()

return fut.Wait(ctx)
}

func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) {
a.commandChan.Send(func() {
for _, op := range events {
if op.IsLocking {
err := a.lockLockRequest(op.Data)
if err != nil {
// TODO 发生这种错误需要重新加载全量状态,下同
logger.Std.Warnf("applying locking event: %s", err.Error())
return
}

} else {
err := a.unlockLockRequest(op.Data)
if err != nil {
logger.Std.Warnf("applying unlocking event: %s", err.Error())
return
}
func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) {
func() {
a.lock.Lock()
defer a.lock.Unlock()

if evt.IsLocking {
err := a.lockLockRequest(evt.Data)
if err != nil {
// TODO 发生这种错误需要重新加载全量状态,下同
logger.Std.Warnf("applying locking event: %s", err.Error())
return
}

// 处理了多少事件,Index就往后移动多少个
a.localLockReqIndex++
} else {
err := a.unlockLockRequest(evt.Data)
if err != nil {
logger.Std.Warnf("applying unlocking event: %s", err.Error())
return
}
}

// 检查是否有等待同步进度的需求
a.wakeUpIndexWaiter()
})
a.localLockReqIndex++
}()

// 检查是否有等待同步进度的需求
a.wakeUpIndexWaiter()
}

func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error {
@@ -135,83 +135,74 @@ func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error {
// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。
// 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。
func (a *ProvidersActor) TestLockRequestAndMakeData(req LockRequest) (LockRequestData, error) {
return actor.WaitValue(context.TODO(), a.commandChan, func() (LockRequestData, error) {
reqData := LockRequestData{}
a.lock.Lock()
defer a.lock.Unlock()

for _, lock := range req.Locks {
n, ok := a.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path)
}
reqData := LockRequestData{}

err := n.Value.CanLock(lock)
if err != nil {
return LockRequestData{}, err
}
for _, lock := range req.Locks {
n, ok := a.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path)
}

targetStr, err := n.Value.GetTargetString(lock.Target)
if err != nil {
return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err)
}
err := n.Value.CanLock(lock)
if err != nil {
return LockRequestData{}, err
}

reqData.Locks = append(reqData.Locks, lockData{
Path: lock.Path,
Name: lock.Name,
Target: targetStr,
})
targetStr, err := n.Value.GetTargetString(lock.Target)
if err != nil {
return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err)
}

return reqData, nil
})
reqData.Locks = append(reqData.Locks, lockData{
Path: lock.Path,
Name: lock.Name,
Target: targetStr,
})
}

return reqData, nil
}

// ResetState 重置内部状态
func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestData) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
for _, p := range a.allProviders {
p.Clear()
}
a.lock.Lock()
defer a.lock.Unlock()

for _, reqData := range lockRequestData {
err := a.lockLockRequest(reqData)
if err != nil {
return fmt.Errorf("lock by lock request data failed, err: %w", err)
}
var err error

for _, p := range a.allProviders {
p.Clear()
}

for _, reqData := range lockRequestData {
err = a.lockLockRequest(reqData)
if err != nil {
err = fmt.Errorf("applying lock request data: %w", err)
break
}
}

a.localLockReqIndex = index
a.localLockReqIndex = index

// 检查是否有等待同步进度的需求
a.wakeUpIndexWaiter()
// 内部状态已被破坏,停止所有监听器
for _, w := range a.indexWaiters {
w.Callback.SetError(ErrWaitIndexUpdateTimeout)
}
a.indexWaiters = nil

return nil
})
return err
}

func (a *ProvidersActor) wakeUpIndexWaiter() {
var resetWaiters []indexWaiter
for _, waiter := range a.indexWaiters {
if waiter.Index <= a.localLockReqIndex {
waiter.Future.SetVoid()
waiter.Callback.SetVoid()
} else {
resetWaiters = append(resetWaiters, waiter)
}
}
a.indexWaiters = resetWaiters
}

func (a *ProvidersActor) Serve() error {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}

+ 30
- 5
pkgs/distlock/internal/release_actor.go View File

@@ -24,7 +24,7 @@ type ReleaseActor struct {

releasingLockRequestIDs map[string]bool
timer *time.Timer
timerSetuped bool
timerSetup bool
lock sync.Mutex
}

@@ -66,6 +66,27 @@ func (a *ReleaseActor) DelayRelease(reqIDs []string) {
a.setupTimer()
}

func (a *ReleaseActor) ResetState(reqIDs []string) {
a.lock.Lock()
defer a.lock.Unlock()

a.releasingLockRequestIDs = make(map[string]bool)
for _, id := range reqIDs {
a.releasingLockRequestIDs[id] = true
}

a.setupTimer()
}

func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) {
a.lock.Lock()
defer a.lock.Unlock()

if !event.IsLocking {
delete(a.releasingLockRequestIDs, event.Data.ID)
}
}

func (a *ReleaseActor) doReleasing() error {
ctx := context.TODO()

@@ -87,7 +108,7 @@ func (a *ReleaseActor) doReleasing() error {

// TODO 可以考虑优化成一次性删除多个锁
for id := range a.releasingLockRequestIDs {
lockReqKey := makeEtcdLockRequestKey(id)
lockReqKey := MakeEtcdLockRequestKey(id)

txResp, err := a.etcdCli.Txn(ctx).
If(clientv3util.KeyExists(lockReqKey)).
@@ -110,10 +131,10 @@ func (a *ReleaseActor) setupTimer() {
return
}

if a.timerSetuped {
if a.timerSetup {
return
}
a.timerSetuped = true
a.timerSetup = true

delay := int64(0)
if a.cfg.RandomReleasingDelayMs == 0 {
@@ -130,7 +151,11 @@ func (a *ReleaseActor) setupTimer() {

go func() {
<-a.timer.C
a.timerSetuped = false

a.lock.Lock()
defer a.lock.Unlock()

a.timerSetup = false

// TODO 处理错误
err := a.doReleasing()


+ 136
- 0
pkgs/distlock/internal/service_info_actor.go View File

@@ -0,0 +1,136 @@
package internal

import (
"context"
"fmt"
"sync"

"github.com/google/uuid"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)

type serviceStatus struct {
Info ServiceInfo
LockRequestIDs []string
}

type ServiceInfoActor struct {
cfg *Config
etcdCli *clientv3.Client
releaseActor *ReleaseActor

lock sync.Mutex
selfInfo ServiceInfo
leaseID *clientv3.LeaseID
services map[string]*serviceStatus
}

func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client) *ServiceInfoActor {
return &ServiceInfoActor{
cfg: cfg,
etcdCli: etcdCli,
}
}

func (a *ServiceInfoActor) GetSelfInfo() *ServiceInfo {
return &a.selfInfo
}

func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []ServiceInfo, currentLocks []LockRequestData) ([]string, error) {
a.lock.Lock()
defer a.lock.Unlock()

if a.leaseID != nil {
a.etcdCli.Revoke(ctx, *a.leaseID)
a.leaseID = nil
}

// 生成并注册服务信息
a.selfInfo.ID = uuid.NewString()

infoData, err := serder.ObjectToJSON(a.selfInfo)
if err != nil {
return nil, fmt.Errorf("service info to json: %w", err)
}

lease, err := a.etcdCli.Grant(ctx, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("granting lease: %w", err)
}

a.leaseID = &lease.ID

_, err = a.etcdCli.Put(ctx, MakeServiceInfoKey(a.selfInfo.ID), string(infoData), clientv3.WithLease(lease.ID))
if err != nil {
a.etcdCli.Revoke(ctx, lease.ID)
return nil, fmt.Errorf("putting service info to etcd: %w", err)
}

// 导入当前已有的服务信息和锁信息
a.services = make(map[string]*serviceStatus)
for _, svc := range currentServices {
a.services[svc.ID] = &serviceStatus{
Info: svc,
}
}

// 导入锁信息的过程中可能会发现未注册信息的锁服务的锁,把他们挑出来释放掉
var willReleaseIDs []string
for _, lock := range currentLocks {
svc, ok := a.services[lock.SerivceID]
if !ok {
willReleaseIDs = append(willReleaseIDs, lock.ID)
continue
}

svc.LockRequestIDs = append(svc.LockRequestIDs, lock.ID)
}

return willReleaseIDs, nil
}

func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) {
a.lock.Lock()
defer a.lock.Unlock()

// TODO 可以考虑打印一点日志

if evt.IsNew {
a.services[evt.Info.ID] = &serviceStatus{
Info: evt.Info,
}
} else {
status, ok := a.services[evt.Info.ID]
if !ok {
return
}

a.releaseActor.DelayRelease(status.LockRequestIDs)

delete(a.services, evt.Info.ID)

// TODO 处理收到自己崩溃的消息
}
}

func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) {
a.lock.Lock()
defer a.lock.Unlock()

status, ok := a.services[evt.Data.SerivceID]
if !ok {
// 加锁的是一个没有注册过的锁服务,大概率是因为这个锁服务之前网络发生了波动,
// 在波动期间它注册的信息过期,于是被当前的服务删除了。
// 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。
a.releaseActor.Release([]string{evt.Data.ID})
return
}

if evt.IsLocking {
status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID)
} else {
status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID)
}
}

+ 0
- 116
pkgs/distlock/internal/utils.go View File

@@ -1,116 +0,0 @@
package internal

import (
"strings"
)

func makeEtcdLockRequestKey(reqID string) string {
return EtcdLockRequestData + "/" + reqID
}

func getLockRequestID(key string) string {
return strings.TrimPrefix(key, EtcdLockRequestData+"/")
}

/*
func parseLockData(str string) (lock lockData, err error) {
sb := strings.Builder{}
var comps []string

escaping := false
for _, ch := range strings.TrimSpace(str) {
if escaping {
if ch == 'n' {
sb.WriteRune('\n')
} else {
sb.WriteRune(ch)
}

escaping = false
continue
}

if ch == '/' {
comps = append(comps, sb.String())
sb.Reset()
} else if ch == '\\' {
escaping = true
} else {
sb.WriteRune(ch)
}
}

comps = append(comps, sb.String())

if len(comps) < 3 {
return lockData{}, fmt.Errorf("string must includes 3 components devided by /")
}

return lockData{
Path: comps[0 : len(comps)-2],
Name: comps[len(comps)-2],
Target: comps[len(comps)-1],
}, nil
}

func lockDataToString(lock lockData) string {
sb := strings.Builder{}

for _, s := range lock.Path {
sb.WriteString(lockDataEncoding(s))
sb.WriteRune('/')
}

sb.WriteString(lockDataEncoding(lock.Name))
sb.WriteRune('/')

sb.WriteString(lockDataEncoding(lock.Target))

return sb.String()
}

func lockDataEncoding(str string) string {
sb := strings.Builder{}

for _, ch := range str {
if ch == '\\' {
sb.WriteString("\\\\")
} else if ch == '/' {
sb.WriteString("\\/")
} else if ch == '\n' {
sb.WriteString("\\n")
} else {
sb.WriteRune(ch)
}
}

return sb.String()
}

func lockDataDecoding(str string) string {
sb := strings.Builder{}

escaping := false
for _, ch := range str {
if escaping {
if ch == 'n' {
sb.WriteRune('\n')
} else {
sb.WriteRune(ch)
}

escaping = false
continue
}

if ch == '\\' {
escaping = true

} else {
sb.WriteRune(ch)
}
}

return sb.String()
}
*/

+ 0
- 61
pkgs/distlock/internal/utils_test.go View File

@@ -1,61 +0,0 @@
package internal

/*
import (
. "github.com/smartystreets/goconvey/convey"
)

func Test_parseLockData_lockDataToString(t *testing.T) {
cases := []struct {
title string
data lockData
}{
{
title: "多段路径",
data: lockData{
Path: []string{"a", "b", "c"},
Name: "d",
Target: "e",
},
},

{
title: "包含分隔符",
data: lockData{
Path: []string{"a/", "b", "c/c"},
Name: "/d",
Target: "///e//d/",
},
},

{
title: "包含转义符",
data: lockData{
Path: []string{"a\\/", "b", "\\c/c"},
Name: "/d",
Target: "///e\\//d/\\",
},
},

{
title: "包含换行符",
data: lockData{
Path: []string{"a\n", "\nb", "c\nc"},
Name: "/d",
Target: "e\nd\n",
},
},
}

for _, ca := range cases {
Convey(ca.title, t, func() {
str := lockDataToString(ca.data)

data, err := parseLockData(str)

So(err, ShouldBeNil)
So(data, ShouldResemble, ca.data)
})
}
}
*/

+ 102
- 62
pkgs/distlock/internal/watch_etcd_actor.go View File

@@ -3,9 +3,9 @@ package internal
import (
"context"
"fmt"
"strings"

"gitlink.org.cn/cloudream/common/pkgs/actor"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)
@@ -15,16 +15,23 @@ type LockRequestEvent struct {
Data LockRequestData
}

type LockRequestEventWatcher struct {
OnEvent func(events []LockRequestEvent)
type ServiceEvent struct {
IsNew bool
Info ServiceInfo
}

type OnLockRequestEventFn func(event LockRequestEvent)

type OnServiceEventFn func(event ServiceEvent)

type WatchEtcdActor struct {
etcdCli *clientv3.Client
watchChan clientv3.WatchChan
lockReqWatchers []*LockRequestEventWatcher
etcdCli *clientv3.Client

commandChan *actor.CommandChannel
watchChan clientv3.WatchChan
watchChanCancel func()
onLockRequestEventFn OnLockRequestEventFn
onServiceEventFn OnServiceEventFn
commandChan *actor.CommandChannel
}

func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor {
@@ -34,33 +41,32 @@ func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor {
}
}

func (a *WatchEtcdActor) Init() {
func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn) {
a.onLockRequestEventFn = onLockRequestEvent
a.onServiceEventFn = onServiceDown
}

func (a *WatchEtcdActor) StartWatching(revision int64) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
return nil
})
}

func (a *WatchEtcdActor) StopWatching() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.watchChan = nil
return nil
})
}
func (a *WatchEtcdActor) Start(revision int64) {
actor.Wait(context.Background(), a.commandChan, func() error {
if a.watchChanCancel != nil {
a.watchChanCancel()
a.watchChanCancel = nil
}

func (a *WatchEtcdActor) AddEventWatcher(watcher *LockRequestEventWatcher) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.lockReqWatchers = append(a.lockReqWatchers, watcher)
ctx, cancel := context.WithCancel(context.Background())
a.watchChan = a.etcdCli.Watch(ctx, EtcdWatchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
a.watchChanCancel = cancel
return nil
})
}

func (a *WatchEtcdActor) RemoveEventWatcher(watcher *LockRequestEventWatcher) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.lockReqWatchers = mylo.Remove(a.lockReqWatchers, watcher)
func (a *WatchEtcdActor) Stop() {
actor.Wait(context.Background(), a.commandChan, func() error {
if a.watchChanCancel != nil {
a.watchChanCancel()
a.watchChanCancel = nil
}
a.watchChan = nil
return nil
})
}
@@ -85,14 +91,10 @@ func (a *WatchEtcdActor) Serve() error {
return fmt.Errorf("watch etcd channel closed")
}

events, err := a.parseEvents(msg)
err := a.dispatchEtcdEvent(msg)
if err != nil {
// TODO 更好的错误处理
return fmt.Errorf("parse etcd lock request data failed, err: %w", err)
}

for _, w := range a.lockReqWatchers {
w.OnEvent(events)
return err
}
}

@@ -109,41 +111,79 @@ func (a *WatchEtcdActor) Serve() error {
}
}

func (a *WatchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]LockRequestEvent, error) {
var events []LockRequestEvent

func (a *WatchEtcdActor) dispatchEtcdEvent(watchResp clientv3.WatchResponse) error {
for _, e := range watchResp.Events {
key := string(e.Kv.Key)

shouldParseData := false
isLocking := true
var valueData []byte

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if e.Type == clientv3.EventTypeDelete {
shouldParseData = true
isLocking = false
valueData = e.PrevKv.Value
} else if e.IsCreate() {
shouldParseData = true
isLocking = true
valueData = e.Kv.Value
}
if strings.HasPrefix(key, EtcdLockRequestDataPrefix) {
if err := a.applyLockRequestEvent(e); err != nil {
return fmt.Errorf("parsing lock request event: %w", err)
}

if !shouldParseData {
continue
} else if strings.HasPrefix(key, EtcdServiceInfoPrefix) {
if err := a.applyServiceEvent(e); err != nil {
return fmt.Errorf("parsing service event: %w", err)
}
}
}

var reqData LockRequestData
err := serder.JSONToObject(valueData, &reqData)
if err != nil {
return nil, fmt.Errorf("parse lock request data failed, err: %w", err)
}
return nil
}

func (a *WatchEtcdActor) applyLockRequestEvent(evt *clientv3.Event) error {
isLocking := true
var valueData []byte

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if evt.Type == clientv3.EventTypeDelete {
isLocking = false
valueData = evt.PrevKv.Value
} else if evt.IsCreate() {
isLocking = true
valueData = evt.Kv.Value
} else {
return nil
}

var reqData LockRequestData
err := serder.JSONToObject(valueData, &reqData)
if err != nil {
return fmt.Errorf("parse lock request data failed, err: %w", err)
}

events = append(events, LockRequestEvent{
IsLocking: isLocking,
Data: reqData,
})
a.onLockRequestEventFn(LockRequestEvent{
IsLocking: isLocking,
Data: reqData,
})

return nil
}

func (a *WatchEtcdActor) applyServiceEvent(evt *clientv3.Event) error {
isNew := true
var valueData []byte

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if evt.Type == clientv3.EventTypeDelete {
isNew = false
valueData = evt.PrevKv.Value
} else if evt.IsCreate() {
isNew = true
valueData = evt.Kv.Value
} else {
return nil
}

return events, nil
var svcInfo ServiceInfo
err := serder.JSONToObject(valueData, &svcInfo)
if err != nil {
return fmt.Errorf("parsing service info: %w", err)
}

a.onServiceEventFn(ServiceEvent{
IsNew: isNew,
Info: svcInfo,
})

return nil
}

+ 77
- 65
pkgs/distlock/service.go View File

@@ -47,13 +47,12 @@ type Service struct {
cfg *internal.Config
etcdCli *clientv3.Client

acquireActor *internal.AcquireActor
releaseActor *internal.ReleaseActor
providersActor *internal.ProvidersActor
watchEtcdActor *internal.WatchEtcdActor
leaseActor *internal.LeaseActor

lockReqEventWatcher internal.LockRequestEventWatcher
acquireActor *internal.AcquireActor
releaseActor *internal.ReleaseActor
providersActor *internal.ProvidersActor
watchEtcdActor *internal.WatchEtcdActor
leaseActor *internal.LeaseActor
serviceInfoActor *internal.ServiceInfoActor
}

func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error) {
@@ -72,24 +71,36 @@ func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error
providersActor := internal.NewProvidersActor()
watchEtcdActor := internal.NewWatchEtcdActor(etcdCli)
leaseActor := internal.NewLeaseActor()
serviceInfoActor := internal.NewServiceInfoActor(cfg, etcdCli)

acquireActor.Init(providersActor)
providersActor.Init()
watchEtcdActor.Init()
leaseActor.Init(releaseActor)
providersActor.Init()
watchEtcdActor.Init(
func(event internal.LockRequestEvent) {
providersActor.OnLockRequestEvent(event)
acquireActor.TryAcquireNow()
releaseActor.OnLockRequestEvent(event)
serviceInfoActor.OnLockRequestEvent(event)
},
func(event internal.ServiceEvent) {
serviceInfoActor.OnServiceEvent(event)
},
)

for _, prov := range initProvs {
providersActor.AddProvider(prov.Provider, prov.Path...)
}

return &Service{
cfg: cfg,
etcdCli: etcdCli,
acquireActor: acquireActor,
releaseActor: releaseActor,
providersActor: providersActor,
watchEtcdActor: watchEtcdActor,
leaseActor: leaseActor,
cfg: cfg,
etcdCli: etcdCli,
acquireActor: acquireActor,
releaseActor: releaseActor,
providersActor: providersActor,
watchEtcdActor: watchEtcdActor,
leaseActor: leaseActor,
serviceInfoActor: serviceInfoActor,
}, nil
}

@@ -147,14 +158,6 @@ func (svc *Service) Serve() error {
// 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。
// 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用

go func() {
// TODO 处理错误
err := svc.providersActor.Serve()
if err != nil {
logger.Std.Warnf("serving providers actor failed, err: %s", err.Error())
}
}()

go func() {
// TODO 处理错误
err := svc.watchEtcdActor.Serve()
@@ -171,34 +174,13 @@ func (svc *Service) Serve() error {
}
}()

revision, err := svc.loadState()
// TODO context
err := svc.resetState(context.Background())
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("init data failed, err: %w", err)
}

svc.lockReqEventWatcher.OnEvent = func(events []internal.LockRequestEvent) {
svc.acquireActor.TryAcquireNow()
svc.providersActor.ApplyLockRequestEvents(events)
}
err = svc.watchEtcdActor.AddEventWatcher(&svc.lockReqEventWatcher)
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("add lock request event watcher failed, err: %w", err)
}

err = svc.watchEtcdActor.StartWatching(revision)
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("start watching etcd failed, err: %w", err)
}

err = svc.leaseActor.StartChecking()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("start checking lease failed, err: %w", err)
}

// TODO 防止退出的临时解决办法
ch := make(chan any)
<-ch
@@ -206,51 +188,81 @@ func (svc *Service) Serve() error {
return nil
}

func (svc *Service) loadState() (int64, error) {
// 使用事务一次性获取index和锁数据,就不需要加全局锁了
txResp, err := svc.etcdCli.Txn(context.Background()).
// ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错,
// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤
func (svc *Service) resetState(ctx context.Context) error {
// 必须使用事务一次性获取所有数据
txResp, err := svc.etcdCli.Txn(ctx).
Then(
clientv3.OpGet(internal.EtcdLockRequestIndex),
clientv3.OpGet(internal.EtcdLockRequestData, clientv3.WithPrefix()),
clientv3.OpGet(internal.EtcdLockRequestDataPrefix, clientv3.WithPrefix()),
clientv3.OpGet(internal.EtcdServiceInfoPrefix, clientv3.WithPrefix()),
).
Commit()
if err != nil {
return 0, fmt.Errorf("get etcd data failed, err: %w", err)
return fmt.Errorf("getting etcd data: %w", err)
}

indexKvs := txResp.Responses[0].GetResponseRange().Kvs
lockKvs := txResp.Responses[1].GetResponseRange().Kvs

var index int64
var reqData []internal.LockRequestData

// 解析Index
var index int64 = 0
indexKvs := txResp.Responses[0].GetResponseRange().Kvs
if len(indexKvs) > 0 {
val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
return fmt.Errorf("parsing lock request index: %w", err)
}
index = val

} else {
index = 0
}

// 解析锁请求数据
var reqData []internal.LockRequestData
lockKvs := txResp.Responses[1].GetResponseRange().Kvs
for _, kv := range lockKvs {
var req internal.LockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return 0, fmt.Errorf("parse lock request data failed, err: %w", err)
return fmt.Errorf("parsing lock request data: %w", err)
}

reqData = append(reqData, req)
}

// 解析服务信息数据
var svcInfo []internal.ServiceInfo
svcInfoKvs := txResp.Responses[2].GetResponseRange().Kvs
for _, kv := range svcInfoKvs {
var info internal.ServiceInfo
err := serder.JSONToObject(kv.Value, &info)
if err != nil {
return fmt.Errorf("parsing service info data: %w", err)
}

svcInfo = append(svcInfo, info)
}

// 先停止监听等定时事件
svc.watchEtcdActor.Stop()
svc.leaseActor.Stop()

// 然后将新获取到的状态装填到Actor中。注:执行顺序需要考虑Actor会被谁调用,不会被调用的优先Reset。
releasingIDs, err := svc.serviceInfoActor.ResetState(ctx, svcInfo, reqData)
if err != nil {
return fmt.Errorf("reseting service info actor: %w", err)
}

svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID)

svc.leaseActor.ResetState()

err = svc.providersActor.ResetState(index, reqData)
if err != nil {
return 0, fmt.Errorf("reset lock providers state failed, err: %w", err)
return fmt.Errorf("reseting providers actor: %w", err)
}

return txResp.Header.Revision, nil
svc.releaseActor.ResetState(releasingIDs)

// 重置完了之后再启动监听
svc.watchEtcdActor.Start(txResp.Header.Revision)
svc.leaseActor.Start()
return nil
}

Loading…
Cancel
Save