diff --git a/pkgs/distlock/distlock.go b/pkgs/distlock/distlock.go index 93091bb..2972c74 100644 --- a/pkgs/distlock/distlock.go +++ b/pkgs/distlock/distlock.go @@ -1,40 +1,18 @@ package distlock -import "fmt" +import ( + "fmt" -type Lock struct { - Path []string // 锁路径,存储的是路径的每一部分 - Name string // 锁名 - Target any // 锁对象,由具体的Provider去解析 -} - -type LockRequest struct { - Locks []Lock -} - -func (b *LockRequest) Add(lock Lock) { - b.Locks = append(b.Locks, lock) -} - -type LockProvider interface { - // CanLock 判断这个锁能否锁定成功 - CanLock(lock Lock) error + "gitlink.org.cn/cloudream/common/pkgs/distlock/internal" +) - // Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。 - Lock(reqID string, lock Lock) error +type Lock = internal.Lock - // 解锁 - Unlock(reqID string, lock Lock) error +type LockRequest = internal.LockRequest - // GetTargetString 将锁对象序列化为字符串,方便存储到ETCD - GetTargetString(target any) (string, error) +type LockProvider = internal.LockProvider - // ParseTargetString 解析字符串格式的锁对象数据 - ParseTargetString(targetStr string) (any, error) - - // Clear 清除内部所有状态 - Clear() -} +type Config = internal.Config type LockTargetBusyError struct { lockName string diff --git a/pkgs/distlock/service/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go similarity index 88% rename from pkgs/distlock/service/internal/acquire_actor.go rename to pkgs/distlock/internal/acquire_actor.go index e7a2a39..770ad9c 100644 --- a/pkgs/distlock/service/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -7,7 +7,6 @@ import ( "strconv" "sync" - "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" mylo "gitlink.org.cn/cloudream/common/utils/lo" @@ -18,30 +17,14 @@ import ( var ErrAcquiringTimeout = errors.New("acquiring timeout") -const ( - EtcdLockRequestData = "/distlock/lockRequest/data" - EtcdLockRequestIndex = "/distlock/lockRequest/index" - EtcdLockRequestLock = "/distlock/lockRequest/lock" -) - -type lockData struct { - Path []string `json:"path"` - Name string `json:"name"` - Target string `json:"target"` -} -type LockRequestData struct { - ID string `json:"id"` - Locks []lockData `json:"locks"` -} - type acquireInfo struct { - Request distlock.LockRequest + Request LockRequest Callback *future.SetValueFuture[string] LastErr error } type AcquireActor struct { - cfg *distlock.Config + cfg *Config etcdCli *clientv3.Client providersActor *ProvidersActor @@ -49,7 +32,7 @@ type AcquireActor struct { lock sync.Mutex } -func NewAcquireActor(cfg *distlock.Config, etcdCli *clientv3.Client) *AcquireActor { +func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor { return &AcquireActor{ cfg: cfg, etcdCli: etcdCli, @@ -61,7 +44,7 @@ func (a *AcquireActor) Init(providersActor *ProvidersActor) { } // Acquire 请求一批锁。成功后返回锁请求ID -func (a *AcquireActor) Acquire(ctx context.Context, req distlock.LockRequest) (string, error) { +func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, error) { info := &acquireInfo{ Request: req, Callback: future.NewSetValue[string](), diff --git a/pkgs/distlock/config.go b/pkgs/distlock/internal/config.go similarity index 97% rename from pkgs/distlock/config.go rename to pkgs/distlock/internal/config.go index ef2acb1..b6219f3 100644 --- a/pkgs/distlock/config.go +++ b/pkgs/distlock/internal/config.go @@ -1,4 +1,4 @@ -package distlock +package internal type Config struct { EtcdAddress string `json:"etcdAddress"` diff --git a/pkgs/distlock/service/internal/lease_actor.go b/pkgs/distlock/internal/lease_actor.go similarity index 100% rename from pkgs/distlock/service/internal/lease_actor.go rename to pkgs/distlock/internal/lease_actor.go diff --git a/pkgs/distlock/internal/models.go b/pkgs/distlock/internal/models.go new file mode 100644 index 0000000..b0c7eeb --- /dev/null +++ b/pkgs/distlock/internal/models.go @@ -0,0 +1,52 @@ +package internal + +const ( + EtcdLockRequestData = "/distlock/lockRequest/data" + EtcdLockRequestIndex = "/distlock/lockRequest/index" + EtcdLockRequestLock = "/distlock/lockRequest/lock" +) + +type Lock struct { + Path []string // 锁路径,存储的是路径的每一部分 + Name string // 锁名 + Target any // 锁对象,由具体的Provider去解析 +} + +type LockRequest struct { + Locks []Lock +} + +func (b *LockRequest) Add(lock Lock) { + b.Locks = append(b.Locks, lock) +} + +type LockProvider interface { + // CanLock 判断这个锁能否锁定成功 + CanLock(lock Lock) error + + // Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。 + Lock(reqID string, lock Lock) error + + // 解锁 + Unlock(reqID string, lock Lock) error + + // GetTargetString 将锁对象序列化为字符串,方便存储到ETCD + GetTargetString(target any) (string, error) + + // ParseTargetString 解析字符串格式的锁对象数据 + ParseTargetString(targetStr string) (any, error) + + // Clear 清除内部所有状态 + Clear() +} + +type lockData struct { + Path []string `json:"path"` + Name string `json:"name"` + Target string `json:"target"` +} + +type LockRequestData struct { + ID string `json:"id"` + Locks []lockData `json:"locks"` +} diff --git a/pkgs/distlock/service/internal/providers_actor.go b/pkgs/distlock/internal/providers_actor.go similarity index 92% rename from pkgs/distlock/service/internal/providers_actor.go rename to pkgs/distlock/internal/providers_actor.go index c81d75c..23be57e 100644 --- a/pkgs/distlock/service/internal/providers_actor.go +++ b/pkgs/distlock/internal/providers_actor.go @@ -5,7 +5,6 @@ import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/actor" - "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" @@ -18,8 +17,8 @@ type indexWaiter struct { type ProvidersActor struct { localLockReqIndex int64 - provdersTrie trie.Trie[distlock.LockProvider] - allProviders []distlock.LockProvider + provdersTrie trie.Trie[LockProvider] + allProviders []LockProvider indexWaiters []indexWaiter @@ -32,7 +31,7 @@ func NewProvidersActor() *ProvidersActor { } } -func (a *ProvidersActor) AddProvider(prov distlock.LockProvider, path ...any) { +func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) { a.provdersTrie.Create(path).Value = prov a.allProviders = append(a.allProviders, prov) } @@ -97,7 +96,7 @@ func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error { return fmt.Errorf("parse target data failed, err: %w", err) } - err = node.Value.Lock(reqData.ID, distlock.Lock{ + err = node.Value.Lock(reqData.ID, Lock{ Path: lockData.Path, Name: lockData.Name, Target: target, @@ -121,7 +120,7 @@ func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error { return fmt.Errorf("parse target data failed, err: %w", err) } - err = node.Value.Unlock(reqData.ID, distlock.Lock{ + err = node.Value.Unlock(reqData.ID, Lock{ Path: lockData.Path, Name: lockData.Name, Target: target, @@ -135,7 +134,7 @@ func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error { // TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。 // 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。 -func (a *ProvidersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (LockRequestData, error) { +func (a *ProvidersActor) TestLockRequestAndMakeData(req LockRequest) (LockRequestData, error) { return actor.WaitValue(context.TODO(), a.commandChan, func() (LockRequestData, error) { reqData := LockRequestData{} diff --git a/pkgs/distlock/service/internal/release_actor.go b/pkgs/distlock/internal/release_actor.go similarity index 95% rename from pkgs/distlock/service/internal/release_actor.go rename to pkgs/distlock/internal/release_actor.go index c633d53..4105350 100644 --- a/pkgs/distlock/service/internal/release_actor.go +++ b/pkgs/distlock/internal/release_actor.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/logger" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/clientv3util" @@ -20,7 +19,7 @@ const ( ) type ReleaseActor struct { - cfg *distlock.Config + cfg *Config etcdCli *clientv3.Client releasingLockRequestIDs map[string]bool @@ -29,7 +28,7 @@ type ReleaseActor struct { lock sync.Mutex } -func NewReleaseActor(cfg *distlock.Config, etcdCli *clientv3.Client) *ReleaseActor { +func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor { return &ReleaseActor{ cfg: cfg, etcdCli: etcdCli, diff --git a/pkgs/distlock/service/internal/utils.go b/pkgs/distlock/internal/utils.go similarity index 100% rename from pkgs/distlock/service/internal/utils.go rename to pkgs/distlock/internal/utils.go diff --git a/pkgs/distlock/service/internal/utils_test.go b/pkgs/distlock/internal/utils_test.go similarity index 100% rename from pkgs/distlock/service/internal/utils_test.go rename to pkgs/distlock/internal/utils_test.go diff --git a/pkgs/distlock/service/internal/watch_etcd_actor.go b/pkgs/distlock/internal/watch_etcd_actor.go similarity index 100% rename from pkgs/distlock/service/internal/watch_etcd_actor.go rename to pkgs/distlock/internal/watch_etcd_actor.go diff --git a/pkgs/distlock/service/mutex.go b/pkgs/distlock/mutex.go similarity index 64% rename from pkgs/distlock/service/mutex.go rename to pkgs/distlock/mutex.go index abba728..a7c4d23 100644 --- a/pkgs/distlock/service/mutex.go +++ b/pkgs/distlock/mutex.go @@ -1,14 +1,14 @@ -package service +package distlock -import "gitlink.org.cn/cloudream/common/pkgs/distlock" +import "gitlink.org.cn/cloudream/common/pkgs/distlock/internal" type Mutex struct { svc *Service - lockReq distlock.LockRequest + lockReq internal.LockRequest lockReqID string } -func NewMutex(svc *Service, lockReq distlock.LockRequest) *Mutex { +func NewMutex(svc *Service, lockReq internal.LockRequest) *Mutex { return &Mutex{ svc: svc, lockReq: lockReq, diff --git a/pkgs/distlock/service/service.go b/pkgs/distlock/service.go similarity index 92% rename from pkgs/distlock/service/service.go rename to pkgs/distlock/service.go index 8cb8e34..6fb6430 100644 --- a/pkgs/distlock/service/service.go +++ b/pkgs/distlock/service.go @@ -1,4 +1,4 @@ -package service +package distlock import ( "context" @@ -6,8 +6,7 @@ import ( "strconv" "time" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/distlock/service/internal" + "gitlink.org.cn/cloudream/common/pkgs/distlock/internal" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" @@ -34,10 +33,10 @@ func WithLease(time time.Duration) AcquireOptionFn { type PathProvider struct { Path []any - Provider distlock.LockProvider + Provider internal.LockProvider } -func NewPathProvider(prov distlock.LockProvider, path ...any) PathProvider { +func NewPathProvider(prov internal.LockProvider, path ...any) PathProvider { return PathProvider{ Path: path, Provider: prov, @@ -45,7 +44,7 @@ func NewPathProvider(prov distlock.LockProvider, path ...any) PathProvider { } type Service struct { - cfg *distlock.Config + cfg *internal.Config etcdCli *clientv3.Client acquireActor *internal.AcquireActor @@ -57,7 +56,7 @@ type Service struct { lockReqEventWatcher internal.LockRequestEventWatcher } -func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error) { +func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error) { etcdCli, err := clientv3.New(clientv3.Config{ Endpoints: []string{cfg.EtcdAddress}, Username: cfg.EtcdUsername, @@ -95,7 +94,7 @@ func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error } // Acquire 请求一批锁。成功后返回锁请求ID -func (svc *Service) Acquire(req distlock.LockRequest, opts ...AcquireOptionFn) (string, error) { +func (svc *Service) Acquire(req internal.LockRequest, opts ...AcquireOptionFn) (string, error) { var opt = AcquireOption{ Timeout: time.Second * 10, } @@ -145,7 +144,7 @@ func (svc *Service) Release(reqID string) { func (svc *Service) Serve() error { // TODO 需要停止service的方法 // 目前已知问题: - // 1. client退出时直接中断进程,此时RetryActor可能正在进行Retry,于是导致Etcd锁没有解除就退出了进程。 + // 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。 // 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用 go func() {