diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go index 770ad9c..28e2bb6 100644 --- a/pkgs/distlock/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "sync" + "time" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -28,6 +29,7 @@ type AcquireActor struct { etcdCli *clientv3.Client providersActor *ProvidersActor + serviceID string acquirings []*acquireInfo lock sync.Mutex } @@ -98,6 +100,21 @@ func (a *AcquireActor) TryAcquireNow() { }() } +func (a *AcquireActor) ResetState(serviceID string) { + a.lock.Lock() + defer a.lock.Unlock() + + a.serviceID = serviceID + for _, info := range a.acquirings { + if info.LastErr != nil { + info.Callback.SetError(info.LastErr) + } else { + info.Callback.SetError(ErrAcquiringTimeout) + } + } + a.acquirings = nil +} + func (a *AcquireActor) doAcquiring() error { ctx := context.Background() @@ -126,24 +143,29 @@ func (a *AcquireActor) doAcquiring() error { // TODO 可以考虑一次性获得多个锁 for i := 0; i < len(a.acquirings); i++ { + req := a.acquirings[i] + // 测试锁,并获得锁数据 - reqData, err := a.providersActor.TestLockRequestAndMakeData(a.acquirings[i].Request) + reqData, err := a.providersActor.TestLockRequestAndMakeData(req.Request) if err != nil { - a.acquirings[i].LastErr = err + req.LastErr = err continue } nextIndexStr := strconv.FormatInt(index+1, 10) reqData.ID = nextIndexStr + reqData.SerivceID = a.serviceID + reqData.Reason = req.Request.Reason + reqData.Timestamp = time.Now().Unix() // 锁成功,提交锁数据 err = a.submitLockRequest(ctx, nextIndexStr, reqData) if err != nil { - a.acquirings[i].LastErr = err + req.LastErr = err continue } - a.acquirings[i].Callback.SetValue(reqData.ID) + req.Callback.SetValue(reqData.ID) a.acquirings = mylo.RemoveAt(a.acquirings, i) break } @@ -159,7 +181,7 @@ func (a *AcquireActor) submitLockRequest(ctx context.Context, index string, reqD etcdOps := []clientv3.Op{ clientv3.OpPut(EtcdLockRequestIndex, index), - clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), + clientv3.OpPut(MakeEtcdLockRequestKey(reqData.ID), string(reqBytes)), } txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit() if err != nil { diff --git a/pkgs/distlock/internal/lease_actor.go b/pkgs/distlock/internal/lease_actor.go index 132430a..5b6045f 100644 --- a/pkgs/distlock/internal/lease_actor.go +++ b/pkgs/distlock/internal/lease_actor.go @@ -35,14 +35,14 @@ func (a *LeaseActor) Init(releaseActor *ReleaseActor) { a.releaseActor = releaseActor } -func (a *LeaseActor) StartChecking() error { +func (a *LeaseActor) Start() error { return actor.Wait(context.TODO(), a.commandChan, func() error { a.ticker = time.NewTicker(time.Second) return nil }) } -func (a *LeaseActor) StopChecking() error { +func (a *LeaseActor) Stop() error { return actor.Wait(context.TODO(), a.commandChan, func() error { if a.ticker != nil { a.ticker.Stop() @@ -91,6 +91,13 @@ func (a *LeaseActor) Remove(reqID string) error { }) } +func (a *LeaseActor) ResetState() { + actor.Wait(context.Background(), a.commandChan, func() error { + a.leases = make(map[string]*lockRequestLease) + return nil + }) +} + func (a *LeaseActor) Serve() error { cmdChan := a.commandChan.BeginChanReceive() defer a.commandChan.CloseChanReceive() diff --git a/pkgs/distlock/internal/models.go b/pkgs/distlock/internal/models.go index b0c7eeb..08eb593 100644 --- a/pkgs/distlock/internal/models.go +++ b/pkgs/distlock/internal/models.go @@ -1,9 +1,13 @@ package internal +import "strings" + const ( - EtcdLockRequestData = "/distlock/lockRequest/data" - EtcdLockRequestIndex = "/distlock/lockRequest/index" - EtcdLockRequestLock = "/distlock/lockRequest/lock" + EtcdLockRequestDataPrefix = "/distlock/lockRequest/data" + EtcdLockRequestIndex = "/distlock/lockRequest/index" + EtcdLockRequestLock = "/distlock/lockRequest/lock" + EtcdServiceInfoPrefix = "/distlock/services" + EtcdWatchPrefix = "/distlock" ) type Lock struct { @@ -13,7 +17,8 @@ type Lock struct { } type LockRequest struct { - Locks []Lock + Reason string + Locks []Lock } func (b *LockRequest) Add(lock Lock) { @@ -47,6 +52,25 @@ type lockData struct { } type LockRequestData struct { - ID string `json:"id"` - Locks []lockData `json:"locks"` + ID string `json:"id"` + SerivceID string `json:"serviceID"` + Reason string `json:"reason"` + Timestamp int64 `json:"timestamp"` + Locks []lockData `json:"locks"` +} + +func MakeEtcdLockRequestKey(reqID string) string { + return EtcdLockRequestDataPrefix + "/" + reqID +} + +func GetLockRequestID(key string) string { + return strings.TrimPrefix(key, EtcdLockRequestDataPrefix+"/") +} + +func MakeServiceInfoKey(svcID string) string { + return EtcdServiceInfoPrefix + "/" + svcID +} + +type ServiceInfo struct { + ID string `json:"id"` } diff --git a/pkgs/distlock/internal/providers_actor.go b/pkgs/distlock/internal/providers_actor.go index 23be57e..c9f426c 100644 --- a/pkgs/distlock/internal/providers_actor.go +++ b/pkgs/distlock/internal/providers_actor.go @@ -2,17 +2,20 @@ package internal import ( "context" + "errors" "fmt" + "sync" - "gitlink.org.cn/cloudream/common/pkgs/actor" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" ) +var ErrWaitIndexUpdateTimeout = errors.New("waitting local index updating timeout") + type indexWaiter struct { - Index int64 - Future *future.SetVoidFuture + Index int64 + Callback *future.SetVoidFuture } type ProvidersActor struct { @@ -21,14 +24,11 @@ type ProvidersActor struct { allProviders []LockProvider indexWaiters []indexWaiter - - commandChan *actor.CommandChannel + lock sync.Mutex } func NewProvidersActor() *ProvidersActor { - return &ProvidersActor{ - commandChan: actor.NewCommandChannel(), - } + return &ProvidersActor{} } func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) { @@ -42,46 +42,46 @@ func (a *ProvidersActor) Init() { func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) error { fut := future.NewSetVoid() - a.commandChan.Send(func() { - if index <= a.localLockReqIndex { - fut.SetVoid() - } else { - a.indexWaiters = append(a.indexWaiters, indexWaiter{ - Index: index, - Future: fut, - }) - } - }) + a.lock.Lock() + if index <= a.localLockReqIndex { + fut.SetVoid() + } else { + a.indexWaiters = append(a.indexWaiters, indexWaiter{ + Index: index, + Callback: fut, + }) + } + a.lock.Unlock() return fut.Wait(ctx) } -func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) { - a.commandChan.Send(func() { - for _, op := range events { - if op.IsLocking { - err := a.lockLockRequest(op.Data) - if err != nil { - // TODO 发生这种错误需要重新加载全量状态,下同 - logger.Std.Warnf("applying locking event: %s", err.Error()) - return - } - - } else { - err := a.unlockLockRequest(op.Data) - if err != nil { - logger.Std.Warnf("applying unlocking event: %s", err.Error()) - return - } +func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) { + func() { + a.lock.Lock() + defer a.lock.Unlock() + + if evt.IsLocking { + err := a.lockLockRequest(evt.Data) + if err != nil { + // TODO 发生这种错误需要重新加载全量状态,下同 + logger.Std.Warnf("applying locking event: %s", err.Error()) + return } - // 处理了多少事件,Index就往后移动多少个 - a.localLockReqIndex++ + } else { + err := a.unlockLockRequest(evt.Data) + if err != nil { + logger.Std.Warnf("applying unlocking event: %s", err.Error()) + return + } } - // 检查是否有等待同步进度的需求 - a.wakeUpIndexWaiter() - }) + a.localLockReqIndex++ + }() + + // 检查是否有等待同步进度的需求 + a.wakeUpIndexWaiter() } func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error { @@ -135,83 +135,74 @@ func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error { // TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。 // 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。 func (a *ProvidersActor) TestLockRequestAndMakeData(req LockRequest) (LockRequestData, error) { - return actor.WaitValue(context.TODO(), a.commandChan, func() (LockRequestData, error) { - reqData := LockRequestData{} + a.lock.Lock() + defer a.lock.Unlock() - for _, lock := range req.Locks { - n, ok := a.provdersTrie.WalkEnd(lock.Path) - if !ok || n.Value == nil { - return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path) - } + reqData := LockRequestData{} - err := n.Value.CanLock(lock) - if err != nil { - return LockRequestData{}, err - } + for _, lock := range req.Locks { + n, ok := a.provdersTrie.WalkEnd(lock.Path) + if !ok || n.Value == nil { + return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path) + } - targetStr, err := n.Value.GetTargetString(lock.Target) - if err != nil { - return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err) - } + err := n.Value.CanLock(lock) + if err != nil { + return LockRequestData{}, err + } - reqData.Locks = append(reqData.Locks, lockData{ - Path: lock.Path, - Name: lock.Name, - Target: targetStr, - }) + targetStr, err := n.Value.GetTargetString(lock.Target) + if err != nil { + return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err) } - return reqData, nil - }) + reqData.Locks = append(reqData.Locks, lockData{ + Path: lock.Path, + Name: lock.Name, + Target: targetStr, + }) + } + + return reqData, nil } -// ResetState 重置内部状态 func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestData) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - for _, p := range a.allProviders { - p.Clear() - } + a.lock.Lock() + defer a.lock.Unlock() - for _, reqData := range lockRequestData { - err := a.lockLockRequest(reqData) - if err != nil { - return fmt.Errorf("lock by lock request data failed, err: %w", err) - } + var err error + + for _, p := range a.allProviders { + p.Clear() + } + + for _, reqData := range lockRequestData { + err = a.lockLockRequest(reqData) + if err != nil { + err = fmt.Errorf("applying lock request data: %w", err) + break } + } - a.localLockReqIndex = index + a.localLockReqIndex = index - // 检查是否有等待同步进度的需求 - a.wakeUpIndexWaiter() + // 内部状态已被破坏,停止所有监听器 + for _, w := range a.indexWaiters { + w.Callback.SetError(ErrWaitIndexUpdateTimeout) + } + a.indexWaiters = nil - return nil - }) + return err } func (a *ProvidersActor) wakeUpIndexWaiter() { var resetWaiters []indexWaiter for _, waiter := range a.indexWaiters { if waiter.Index <= a.localLockReqIndex { - waiter.Future.SetVoid() + waiter.Callback.SetVoid() } else { resetWaiters = append(resetWaiters, waiter) } } a.indexWaiters = resetWaiters } - -func (a *ProvidersActor) Serve() error { - cmdChan := a.commandChan.BeginChanReceive() - defer a.commandChan.CloseChanReceive() - - for { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - cmd() - } - } -} diff --git a/pkgs/distlock/internal/release_actor.go b/pkgs/distlock/internal/release_actor.go index 4105350..fea93a9 100644 --- a/pkgs/distlock/internal/release_actor.go +++ b/pkgs/distlock/internal/release_actor.go @@ -24,7 +24,7 @@ type ReleaseActor struct { releasingLockRequestIDs map[string]bool timer *time.Timer - timerSetuped bool + timerSetup bool lock sync.Mutex } @@ -66,6 +66,27 @@ func (a *ReleaseActor) DelayRelease(reqIDs []string) { a.setupTimer() } +func (a *ReleaseActor) ResetState(reqIDs []string) { + a.lock.Lock() + defer a.lock.Unlock() + + a.releasingLockRequestIDs = make(map[string]bool) + for _, id := range reqIDs { + a.releasingLockRequestIDs[id] = true + } + + a.setupTimer() +} + +func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) { + a.lock.Lock() + defer a.lock.Unlock() + + if !event.IsLocking { + delete(a.releasingLockRequestIDs, event.Data.ID) + } +} + func (a *ReleaseActor) doReleasing() error { ctx := context.TODO() @@ -87,7 +108,7 @@ func (a *ReleaseActor) doReleasing() error { // TODO 可以考虑优化成一次性删除多个锁 for id := range a.releasingLockRequestIDs { - lockReqKey := makeEtcdLockRequestKey(id) + lockReqKey := MakeEtcdLockRequestKey(id) txResp, err := a.etcdCli.Txn(ctx). If(clientv3util.KeyExists(lockReqKey)). @@ -110,10 +131,10 @@ func (a *ReleaseActor) setupTimer() { return } - if a.timerSetuped { + if a.timerSetup { return } - a.timerSetuped = true + a.timerSetup = true delay := int64(0) if a.cfg.RandomReleasingDelayMs == 0 { @@ -130,7 +151,11 @@ func (a *ReleaseActor) setupTimer() { go func() { <-a.timer.C - a.timerSetuped = false + + a.lock.Lock() + defer a.lock.Unlock() + + a.timerSetup = false // TODO 处理错误 err := a.doReleasing() diff --git a/pkgs/distlock/internal/service_info_actor.go b/pkgs/distlock/internal/service_info_actor.go new file mode 100644 index 0000000..baad890 --- /dev/null +++ b/pkgs/distlock/internal/service_info_actor.go @@ -0,0 +1,136 @@ +package internal + +import ( + "context" + "fmt" + "sync" + + "github.com/google/uuid" + mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type serviceStatus struct { + Info ServiceInfo + LockRequestIDs []string +} + +type ServiceInfoActor struct { + cfg *Config + etcdCli *clientv3.Client + releaseActor *ReleaseActor + + lock sync.Mutex + selfInfo ServiceInfo + leaseID *clientv3.LeaseID + services map[string]*serviceStatus +} + +func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client) *ServiceInfoActor { + return &ServiceInfoActor{ + cfg: cfg, + etcdCli: etcdCli, + } +} + +func (a *ServiceInfoActor) GetSelfInfo() *ServiceInfo { + return &a.selfInfo +} + +func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []ServiceInfo, currentLocks []LockRequestData) ([]string, error) { + a.lock.Lock() + defer a.lock.Unlock() + + if a.leaseID != nil { + a.etcdCli.Revoke(ctx, *a.leaseID) + a.leaseID = nil + } + + // 生成并注册服务信息 + a.selfInfo.ID = uuid.NewString() + + infoData, err := serder.ObjectToJSON(a.selfInfo) + if err != nil { + return nil, fmt.Errorf("service info to json: %w", err) + } + + lease, err := a.etcdCli.Grant(ctx, a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return nil, fmt.Errorf("granting lease: %w", err) + } + + a.leaseID = &lease.ID + + _, err = a.etcdCli.Put(ctx, MakeServiceInfoKey(a.selfInfo.ID), string(infoData), clientv3.WithLease(lease.ID)) + if err != nil { + a.etcdCli.Revoke(ctx, lease.ID) + return nil, fmt.Errorf("putting service info to etcd: %w", err) + } + + // 导入当前已有的服务信息和锁信息 + a.services = make(map[string]*serviceStatus) + for _, svc := range currentServices { + a.services[svc.ID] = &serviceStatus{ + Info: svc, + } + } + + // 导入锁信息的过程中可能会发现未注册信息的锁服务的锁,把他们挑出来释放掉 + var willReleaseIDs []string + for _, lock := range currentLocks { + svc, ok := a.services[lock.SerivceID] + if !ok { + willReleaseIDs = append(willReleaseIDs, lock.ID) + continue + } + + svc.LockRequestIDs = append(svc.LockRequestIDs, lock.ID) + } + + return willReleaseIDs, nil +} + +func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) { + a.lock.Lock() + defer a.lock.Unlock() + + // TODO 可以考虑打印一点日志 + + if evt.IsNew { + a.services[evt.Info.ID] = &serviceStatus{ + Info: evt.Info, + } + } else { + status, ok := a.services[evt.Info.ID] + if !ok { + return + } + + a.releaseActor.DelayRelease(status.LockRequestIDs) + + delete(a.services, evt.Info.ID) + + // TODO 处理收到自己崩溃的消息 + } +} + +func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) { + a.lock.Lock() + defer a.lock.Unlock() + + status, ok := a.services[evt.Data.SerivceID] + if !ok { + // 加锁的是一个没有注册过的锁服务,大概率是因为这个锁服务之前网络发生了波动, + // 在波动期间它注册的信息过期,于是被当前的服务删除了。 + // 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。 + a.releaseActor.Release([]string{evt.Data.ID}) + return + } + + if evt.IsLocking { + status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID) + } else { + status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID) + } +} diff --git a/pkgs/distlock/internal/utils.go b/pkgs/distlock/internal/utils.go deleted file mode 100644 index d77e5e3..0000000 --- a/pkgs/distlock/internal/utils.go +++ /dev/null @@ -1,116 +0,0 @@ -package internal - -import ( - "strings" -) - -func makeEtcdLockRequestKey(reqID string) string { - return EtcdLockRequestData + "/" + reqID -} - -func getLockRequestID(key string) string { - return strings.TrimPrefix(key, EtcdLockRequestData+"/") -} - -/* -func parseLockData(str string) (lock lockData, err error) { - sb := strings.Builder{} - var comps []string - - escaping := false - for _, ch := range strings.TrimSpace(str) { - if escaping { - if ch == 'n' { - sb.WriteRune('\n') - } else { - sb.WriteRune(ch) - } - - escaping = false - continue - } - - if ch == '/' { - comps = append(comps, sb.String()) - sb.Reset() - } else if ch == '\\' { - escaping = true - } else { - sb.WriteRune(ch) - } - } - - comps = append(comps, sb.String()) - - if len(comps) < 3 { - return lockData{}, fmt.Errorf("string must includes 3 components devided by /") - } - - return lockData{ - Path: comps[0 : len(comps)-2], - Name: comps[len(comps)-2], - Target: comps[len(comps)-1], - }, nil -} - -func lockDataToString(lock lockData) string { - sb := strings.Builder{} - - for _, s := range lock.Path { - sb.WriteString(lockDataEncoding(s)) - sb.WriteRune('/') - } - - sb.WriteString(lockDataEncoding(lock.Name)) - sb.WriteRune('/') - - sb.WriteString(lockDataEncoding(lock.Target)) - - return sb.String() -} - -func lockDataEncoding(str string) string { - sb := strings.Builder{} - - for _, ch := range str { - if ch == '\\' { - sb.WriteString("\\\\") - } else if ch == '/' { - sb.WriteString("\\/") - } else if ch == '\n' { - sb.WriteString("\\n") - } else { - sb.WriteRune(ch) - } - } - - return sb.String() -} - -func lockDataDecoding(str string) string { - sb := strings.Builder{} - - escaping := false - for _, ch := range str { - if escaping { - if ch == 'n' { - sb.WriteRune('\n') - } else { - sb.WriteRune(ch) - } - - escaping = false - continue - } - - if ch == '\\' { - escaping = true - - } else { - sb.WriteRune(ch) - } - } - - return sb.String() -} -*/ diff --git a/pkgs/distlock/internal/utils_test.go b/pkgs/distlock/internal/utils_test.go deleted file mode 100644 index 6c9f95c..0000000 --- a/pkgs/distlock/internal/utils_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package internal - -/* -import ( - . "github.com/smartystreets/goconvey/convey" -) - -func Test_parseLockData_lockDataToString(t *testing.T) { - cases := []struct { - title string - data lockData - }{ - { - title: "多段路径", - data: lockData{ - Path: []string{"a", "b", "c"}, - Name: "d", - Target: "e", - }, - }, - - { - title: "包含分隔符", - data: lockData{ - Path: []string{"a/", "b", "c/c"}, - Name: "/d", - Target: "///e//d/", - }, - }, - - { - title: "包含转义符", - data: lockData{ - Path: []string{"a\\/", "b", "\\c/c"}, - Name: "/d", - Target: "///e\\//d/\\", - }, - }, - - { - title: "包含换行符", - data: lockData{ - Path: []string{"a\n", "\nb", "c\nc"}, - Name: "/d", - Target: "e\nd\n", - }, - }, - } - - for _, ca := range cases { - Convey(ca.title, t, func() { - str := lockDataToString(ca.data) - - data, err := parseLockData(str) - - So(err, ShouldBeNil) - So(data, ShouldResemble, ca.data) - }) - } -} -*/ diff --git a/pkgs/distlock/internal/watch_etcd_actor.go b/pkgs/distlock/internal/watch_etcd_actor.go index 2d0b791..5928da6 100644 --- a/pkgs/distlock/internal/watch_etcd_actor.go +++ b/pkgs/distlock/internal/watch_etcd_actor.go @@ -3,9 +3,9 @@ package internal import ( "context" "fmt" + "strings" "gitlink.org.cn/cloudream/common/pkgs/actor" - mylo "gitlink.org.cn/cloudream/common/utils/lo" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -15,16 +15,23 @@ type LockRequestEvent struct { Data LockRequestData } -type LockRequestEventWatcher struct { - OnEvent func(events []LockRequestEvent) +type ServiceEvent struct { + IsNew bool + Info ServiceInfo } +type OnLockRequestEventFn func(event LockRequestEvent) + +type OnServiceEventFn func(event ServiceEvent) + type WatchEtcdActor struct { - etcdCli *clientv3.Client - watchChan clientv3.WatchChan - lockReqWatchers []*LockRequestEventWatcher + etcdCli *clientv3.Client - commandChan *actor.CommandChannel + watchChan clientv3.WatchChan + watchChanCancel func() + onLockRequestEventFn OnLockRequestEventFn + onServiceEventFn OnServiceEventFn + commandChan *actor.CommandChannel } func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor { @@ -34,33 +41,32 @@ func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor { } } -func (a *WatchEtcdActor) Init() { +func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn) { + a.onLockRequestEventFn = onLockRequestEvent + a.onServiceEventFn = onServiceDown } -func (a *WatchEtcdActor) StartWatching(revision int64) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) - return nil - }) -} - -func (a *WatchEtcdActor) StopWatching() error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.watchChan = nil - return nil - }) -} +func (a *WatchEtcdActor) Start(revision int64) { + actor.Wait(context.Background(), a.commandChan, func() error { + if a.watchChanCancel != nil { + a.watchChanCancel() + a.watchChanCancel = nil + } -func (a *WatchEtcdActor) AddEventWatcher(watcher *LockRequestEventWatcher) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.lockReqWatchers = append(a.lockReqWatchers, watcher) + ctx, cancel := context.WithCancel(context.Background()) + a.watchChan = a.etcdCli.Watch(ctx, EtcdWatchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) + a.watchChanCancel = cancel return nil }) } -func (a *WatchEtcdActor) RemoveEventWatcher(watcher *LockRequestEventWatcher) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.lockReqWatchers = mylo.Remove(a.lockReqWatchers, watcher) +func (a *WatchEtcdActor) Stop() { + actor.Wait(context.Background(), a.commandChan, func() error { + if a.watchChanCancel != nil { + a.watchChanCancel() + a.watchChanCancel = nil + } + a.watchChan = nil return nil }) } @@ -85,14 +91,10 @@ func (a *WatchEtcdActor) Serve() error { return fmt.Errorf("watch etcd channel closed") } - events, err := a.parseEvents(msg) + err := a.dispatchEtcdEvent(msg) if err != nil { // TODO 更好的错误处理 - return fmt.Errorf("parse etcd lock request data failed, err: %w", err) - } - - for _, w := range a.lockReqWatchers { - w.OnEvent(events) + return err } } @@ -109,41 +111,79 @@ func (a *WatchEtcdActor) Serve() error { } } -func (a *WatchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]LockRequestEvent, error) { - var events []LockRequestEvent - +func (a *WatchEtcdActor) dispatchEtcdEvent(watchResp clientv3.WatchResponse) error { for _, e := range watchResp.Events { + key := string(e.Kv.Key) - shouldParseData := false - isLocking := true - var valueData []byte - - // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index - if e.Type == clientv3.EventTypeDelete { - shouldParseData = true - isLocking = false - valueData = e.PrevKv.Value - } else if e.IsCreate() { - shouldParseData = true - isLocking = true - valueData = e.Kv.Value - } + if strings.HasPrefix(key, EtcdLockRequestDataPrefix) { + if err := a.applyLockRequestEvent(e); err != nil { + return fmt.Errorf("parsing lock request event: %w", err) + } - if !shouldParseData { - continue + } else if strings.HasPrefix(key, EtcdServiceInfoPrefix) { + if err := a.applyServiceEvent(e); err != nil { + return fmt.Errorf("parsing service event: %w", err) + } } + } - var reqData LockRequestData - err := serder.JSONToObject(valueData, &reqData) - if err != nil { - return nil, fmt.Errorf("parse lock request data failed, err: %w", err) - } + return nil +} + +func (a *WatchEtcdActor) applyLockRequestEvent(evt *clientv3.Event) error { + isLocking := true + var valueData []byte + + // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index + if evt.Type == clientv3.EventTypeDelete { + isLocking = false + valueData = evt.PrevKv.Value + } else if evt.IsCreate() { + isLocking = true + valueData = evt.Kv.Value + } else { + return nil + } + + var reqData LockRequestData + err := serder.JSONToObject(valueData, &reqData) + if err != nil { + return fmt.Errorf("parse lock request data failed, err: %w", err) + } - events = append(events, LockRequestEvent{ - IsLocking: isLocking, - Data: reqData, - }) + a.onLockRequestEventFn(LockRequestEvent{ + IsLocking: isLocking, + Data: reqData, + }) + + return nil +} + +func (a *WatchEtcdActor) applyServiceEvent(evt *clientv3.Event) error { + isNew := true + var valueData []byte + + // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index + if evt.Type == clientv3.EventTypeDelete { + isNew = false + valueData = evt.PrevKv.Value + } else if evt.IsCreate() { + isNew = true + valueData = evt.Kv.Value + } else { + return nil } - return events, nil + var svcInfo ServiceInfo + err := serder.JSONToObject(valueData, &svcInfo) + if err != nil { + return fmt.Errorf("parsing service info: %w", err) + } + + a.onServiceEventFn(ServiceEvent{ + IsNew: isNew, + Info: svcInfo, + }) + + return nil } diff --git a/pkgs/distlock/service.go b/pkgs/distlock/service.go index 6fb6430..ec08875 100644 --- a/pkgs/distlock/service.go +++ b/pkgs/distlock/service.go @@ -47,13 +47,12 @@ type Service struct { cfg *internal.Config etcdCli *clientv3.Client - acquireActor *internal.AcquireActor - releaseActor *internal.ReleaseActor - providersActor *internal.ProvidersActor - watchEtcdActor *internal.WatchEtcdActor - leaseActor *internal.LeaseActor - - lockReqEventWatcher internal.LockRequestEventWatcher + acquireActor *internal.AcquireActor + releaseActor *internal.ReleaseActor + providersActor *internal.ProvidersActor + watchEtcdActor *internal.WatchEtcdActor + leaseActor *internal.LeaseActor + serviceInfoActor *internal.ServiceInfoActor } func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error) { @@ -72,24 +71,36 @@ func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error providersActor := internal.NewProvidersActor() watchEtcdActor := internal.NewWatchEtcdActor(etcdCli) leaseActor := internal.NewLeaseActor() + serviceInfoActor := internal.NewServiceInfoActor(cfg, etcdCli) acquireActor.Init(providersActor) - providersActor.Init() - watchEtcdActor.Init() leaseActor.Init(releaseActor) + providersActor.Init() + watchEtcdActor.Init( + func(event internal.LockRequestEvent) { + providersActor.OnLockRequestEvent(event) + acquireActor.TryAcquireNow() + releaseActor.OnLockRequestEvent(event) + serviceInfoActor.OnLockRequestEvent(event) + }, + func(event internal.ServiceEvent) { + serviceInfoActor.OnServiceEvent(event) + }, + ) for _, prov := range initProvs { providersActor.AddProvider(prov.Provider, prov.Path...) } return &Service{ - cfg: cfg, - etcdCli: etcdCli, - acquireActor: acquireActor, - releaseActor: releaseActor, - providersActor: providersActor, - watchEtcdActor: watchEtcdActor, - leaseActor: leaseActor, + cfg: cfg, + etcdCli: etcdCli, + acquireActor: acquireActor, + releaseActor: releaseActor, + providersActor: providersActor, + watchEtcdActor: watchEtcdActor, + leaseActor: leaseActor, + serviceInfoActor: serviceInfoActor, }, nil } @@ -147,14 +158,6 @@ func (svc *Service) Serve() error { // 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。 // 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用 - go func() { - // TODO 处理错误 - err := svc.providersActor.Serve() - if err != nil { - logger.Std.Warnf("serving providers actor failed, err: %s", err.Error()) - } - }() - go func() { // TODO 处理错误 err := svc.watchEtcdActor.Serve() @@ -171,34 +174,13 @@ func (svc *Service) Serve() error { } }() - revision, err := svc.loadState() + // TODO context + err := svc.resetState(context.Background()) if err != nil { // TODO 关闭其他的Actor,或者更好的错误处理方式 return fmt.Errorf("init data failed, err: %w", err) } - svc.lockReqEventWatcher.OnEvent = func(events []internal.LockRequestEvent) { - svc.acquireActor.TryAcquireNow() - svc.providersActor.ApplyLockRequestEvents(events) - } - err = svc.watchEtcdActor.AddEventWatcher(&svc.lockReqEventWatcher) - if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("add lock request event watcher failed, err: %w", err) - } - - err = svc.watchEtcdActor.StartWatching(revision) - if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("start watching etcd failed, err: %w", err) - } - - err = svc.leaseActor.StartChecking() - if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("start checking lease failed, err: %w", err) - } - // TODO 防止退出的临时解决办法 ch := make(chan any) <-ch @@ -206,51 +188,81 @@ func (svc *Service) Serve() error { return nil } -func (svc *Service) loadState() (int64, error) { - // 使用事务一次性获取index和锁数据,就不需要加全局锁了 - txResp, err := svc.etcdCli.Txn(context.Background()). +// ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错, +// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤 +func (svc *Service) resetState(ctx context.Context) error { + // 必须使用事务一次性获取所有数据 + txResp, err := svc.etcdCli.Txn(ctx). Then( clientv3.OpGet(internal.EtcdLockRequestIndex), - clientv3.OpGet(internal.EtcdLockRequestData, clientv3.WithPrefix()), + clientv3.OpGet(internal.EtcdLockRequestDataPrefix, clientv3.WithPrefix()), + clientv3.OpGet(internal.EtcdServiceInfoPrefix, clientv3.WithPrefix()), ). Commit() if err != nil { - return 0, fmt.Errorf("get etcd data failed, err: %w", err) + return fmt.Errorf("getting etcd data: %w", err) } - indexKvs := txResp.Responses[0].GetResponseRange().Kvs - lockKvs := txResp.Responses[1].GetResponseRange().Kvs - - var index int64 - var reqData []internal.LockRequestData - // 解析Index + var index int64 = 0 + indexKvs := txResp.Responses[0].GetResponseRange().Kvs if len(indexKvs) > 0 { val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64) if err != nil { - return 0, fmt.Errorf("parse lock request index failed, err: %w", err) + return fmt.Errorf("parsing lock request index: %w", err) } index = val - - } else { - index = 0 } // 解析锁请求数据 + var reqData []internal.LockRequestData + lockKvs := txResp.Responses[1].GetResponseRange().Kvs for _, kv := range lockKvs { var req internal.LockRequestData err := serder.JSONToObject(kv.Value, &req) if err != nil { - return 0, fmt.Errorf("parse lock request data failed, err: %w", err) + return fmt.Errorf("parsing lock request data: %w", err) } reqData = append(reqData, req) } + // 解析服务信息数据 + var svcInfo []internal.ServiceInfo + svcInfoKvs := txResp.Responses[2].GetResponseRange().Kvs + for _, kv := range svcInfoKvs { + var info internal.ServiceInfo + err := serder.JSONToObject(kv.Value, &info) + if err != nil { + return fmt.Errorf("parsing service info data: %w", err) + } + + svcInfo = append(svcInfo, info) + } + + // 先停止监听等定时事件 + svc.watchEtcdActor.Stop() + svc.leaseActor.Stop() + + // 然后将新获取到的状态装填到Actor中。注:执行顺序需要考虑Actor会被谁调用,不会被调用的优先Reset。 + releasingIDs, err := svc.serviceInfoActor.ResetState(ctx, svcInfo, reqData) + if err != nil { + return fmt.Errorf("reseting service info actor: %w", err) + } + + svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID) + + svc.leaseActor.ResetState() + err = svc.providersActor.ResetState(index, reqData) if err != nil { - return 0, fmt.Errorf("reset lock providers state failed, err: %w", err) + return fmt.Errorf("reseting providers actor: %w", err) } - return txResp.Header.Revision, nil + svc.releaseActor.ResetState(releasingIDs) + + // 重置完了之后再启动监听 + svc.watchEtcdActor.Start(txResp.Header.Revision) + svc.leaseActor.Start() + return nil }