diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index d29de48..e6bbb90 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -164,7 +164,9 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { conCol.CollectInPlace() // 公共锁 - publock := publock.NewService() + publock := publock.NewMaster() + publock.Start() + defer publock.Stop() // 访问统计 acStat := accessstat.NewAccessStat(accessstat.Config{ diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 21bed2b..bb5707b 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -175,7 +175,9 @@ func test(configPath string) { conCol.CollectInPlace() // 公共锁 - publock := publock.NewService() + publock := publock.NewMaster() + publock.Start() + defer publock.Stop() // 访问统计 acStat := accessstat.NewAccessStat(accessstat.Config{ diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index c7229af..93589e3 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -155,7 +155,9 @@ func vfsTest(configPath string, opts serveHTTPOptions) { conCol.CollectInPlace() // 公共锁 - publock := publock.NewService() + publock := publock.NewMaster() + publock.Start() + defer publock.Stop() // 访问统计 acStat := accessstat.NewAccessStat(accessstat.Config{ diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index 009a202..646fa74 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -29,7 +29,7 @@ type downloadSpaceInfo struct { } type DownloadContext struct { - PubLock *publock.Service + PubLock *publock.PubLock } type DownloadObjectIterator struct { OnClosing func() diff --git a/client/internal/http/v1/package.go b/client/internal/http/v1/package.go index cb7a9d7..a50316a 100644 --- a/client/internal/http/v1/package.go +++ b/client/internal/http/v1/package.go @@ -19,7 +19,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator" @@ -400,7 +399,7 @@ func (s *PackageService) Pin(ctx *gin.Context) { return } - lock, err := reqbuilder.NewBuilder().Package().Pin(req.PackageID).MutexLock(s.svc.PubLock) + lock, err := s.svc.PubLock.BeginMutex().Package().Pin(req.PackageID).End().Lock() if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "lock package: %v", err)) return diff --git a/client/internal/http/v1/pub_shards.go b/client/internal/http/v1/pub_shards.go index bc5e7f2..cc71c7e 100644 --- a/client/internal/http/v1/pub_shards.go +++ b/client/internal/http/v1/pub_shards.go @@ -16,7 +16,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" @@ -228,7 +227,7 @@ func (s *PubShardsService) ExportPackage(ctx *gin.Context) { return } - lock, err := reqbuilder.NewBuilder().Package().Pin(req.PackageID).MutexLock(s.svc.PubLock) + lock, err := s.svc.PubLock.BeginMutex().Package().Pin(req.PackageID).End().Lock() if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "lock package: %v", err)) return diff --git a/client/internal/publock/core.go b/client/internal/publock/core.go new file mode 100644 index 0000000..5368b84 --- /dev/null +++ b/client/internal/publock/core.go @@ -0,0 +1,197 @@ +package publock + +import ( + "context" + "fmt" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/trie" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/lockprovider" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" +) + +type Core struct { + lock *sync.Mutex + provdersTrie *trie.Trie[types.LockProvider] + acquirings []*acquireInfo + acquired map[types.RequestID]types.LockRequest + nextReqID int64 +} + +func NewCore() *Core { + svc := &Core{ + lock: &sync.Mutex{}, + provdersTrie: trie.NewTrie[types.LockProvider](), + acquired: make(map[types.RequestID]types.LockRequest), + } + + svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock() + svc.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock() + return svc +} + +func (s *Core) Start() { + +} + +func (s *Core) Stop() { + +} + +type acquireInfo struct { + Request types.LockRequest + Callback *future.SetValueFuture[types.RequestID] + LastErr error +} + +func (svc *Core) Acquire(req types.LockRequest, opts ...AcquireOptionFn) (LockedRequest, error) { + var opt = AcquireOption{ + Timeout: time.Second * 10, + } + for _, fn := range opts { + fn(&opt) + } + + ctx := context.Background() + if opt.Timeout != 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, opt.Timeout) + defer cancel() + } + + // 就地检测锁是否可用 + svc.lock.Lock() + defer svc.lock.Unlock() + + reqID, err := svc.tryAcquireOne(req) + if err != nil { + return LockedRequest{}, err + } + + if reqID != "" { + svc.acquired[reqID] = req + return LockedRequest{ + Req: req, + ReqID: reqID, + }, nil + } + + // 就地检测失败,那么就需要异步等待锁可用 + info := &acquireInfo{ + Request: req, + Callback: future.NewSetValue[types.RequestID](), + } + svc.acquirings = append(svc.acquirings, info) + + // 等待的时候不加锁 + svc.lock.Unlock() + reqID, err = info.Callback.Wait(ctx) + svc.lock.Lock() + + if err == nil { + svc.acquired[reqID] = req + return LockedRequest{ + Req: req, + ReqID: reqID, + }, nil + } + + if err != future.ErrCanceled { + lo2.Remove(svc.acquirings, info) + return LockedRequest{}, err + } + + // 如果第一次等待是超时错误,那么在锁里再尝试获取一次结果 + reqID, err = info.Callback.TryGetValue() + if err == nil { + svc.acquired[reqID] = req + return LockedRequest{ + Req: req, + ReqID: reqID, + }, nil + } + + lo2.Remove(svc.acquirings, info) + return LockedRequest{}, err +} + +func (s *Core) release(reqID types.RequestID) { + s.lock.Lock() + defer s.lock.Unlock() + + req, ok := s.acquired[reqID] + if !ok { + return + } + + s.releaseRequest(reqID, req) + s.tryAcquirings() +} + +func (a *Core) tryAcquirings() { + for i := 0; i < len(a.acquirings); i++ { + req := a.acquirings[i] + + reqID, err := a.tryAcquireOne(req.Request) + if err != nil { + req.LastErr = err + continue + } + + req.Callback.SetValue(reqID) + a.acquirings[i] = nil + } + + a.acquirings = lo2.RemoveAllDefault(a.acquirings) +} + +func (s *Core) tryAcquireOne(req types.LockRequest) (types.RequestID, error) { + err := s.testOneRequest(req) + if err != nil { + return "", err + } + + reqID := types.RequestID(fmt.Sprintf("%d", s.nextReqID)) + s.nextReqID++ + + s.applyRequest(reqID, req) + return reqID, nil +} + +func (s *Core) testOneRequest(req types.LockRequest) error { + for _, lock := range req.Locks { + n, ok := s.provdersTrie.WalkEnd(lock.Path) + if !ok || n.Value == nil { + return fmt.Errorf("lock provider not found for path %v", lock.Path) + } + + err := n.Value.CanLock(lock) + if err != nil { + return err + } + } + + return nil +} + +func (s *Core) applyRequest(reqID types.RequestID, req types.LockRequest) { + for _, lock := range req.Locks { + p, _ := s.provdersTrie.WalkEnd(lock.Path) + p.Value.Lock(reqID, lock) + } +} + +func (s *Core) releaseRequest(reqID types.RequestID, req types.LockRequest) { + for _, lock := range req.Locks { + p, _ := s.provdersTrie.WalkEnd(lock.Path) + p.Value.Unlock(reqID, lock) + } +} + +type LockedRequest struct { + Req types.LockRequest + ReqID types.RequestID +} diff --git a/client/internal/publock/mutex.go b/client/internal/publock/mutex.go index 85abc97..475a039 100644 --- a/client/internal/publock/mutex.go +++ b/client/internal/publock/mutex.go @@ -1,15 +1,34 @@ package publock import ( + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" ) type Mutex struct { - svc *Service - lockReq types.LockRequest - lockReqID types.RequestID + pub *PubLock + locked LockedRequest } func (m *Mutex) Unlock() { - m.svc.release(m.lockReqID, m.lockReq) + m.pub.release(m.locked.ReqID) +} + +type MutexBuilder struct { + reqbuilder.LockRequestBuilder[*MutexBuilder] + pub *PubLock +} + +func (b *MutexBuilder) Lock(opt ...AcquireOptionFn) (*Mutex, error) { + lkd, err := b.pub.acquire(types.LockRequest{ + Locks: b.Locks, + }, opt...) + if err != nil { + return nil, err + } + + return &Mutex{ + pub: b.pub, + locked: lkd, + }, nil } diff --git a/client/internal/publock/reentrant.go b/client/internal/publock/reentrant.go index dda4bbf..ecb35f6 100644 --- a/client/internal/publock/reentrant.go +++ b/client/internal/publock/reentrant.go @@ -1,18 +1,22 @@ package publock -import "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" +import ( + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" +) type Reentrant struct { - svc *Service + reqbuilder.LockRequestBuilder[*Reentrant] + p *PubLock reqs []types.LockRequest - locked []*Mutex + locked []LockedRequest } -func (r *Reentrant) Lock(req types.LockRequest, opt ...AcquireOptionFn) error { +func (r *Reentrant) Lock(opt ...AcquireOptionFn) error { var willLock []types.Lock loop: - for _, lock := range req.Locks { + for _, lock := range r.LockRequestBuilder.Locks { for _, req := range r.reqs { for _, locked := range req.Locks { if locked.Equals(lock) { @@ -23,17 +27,17 @@ loop: willLock = append(willLock, lock) } + r.LockRequestBuilder.Locks = nil if len(willLock) == 0 { return nil } newReq := types.LockRequest{ - Reason: req.Reason, - Locks: willLock, + Locks: willLock, } - m, err := r.svc.Acquire(newReq, opt...) + m, err := r.p.acquire(newReq, opt...) if err != nil { return err } @@ -46,7 +50,7 @@ loop: func (r *Reentrant) Unlock() { for i := len(r.reqs) - 1; i >= 0; i-- { - r.locked[i].Unlock() + r.p.release(r.locked[i].ReqID) } r.locked = nil r.reqs = nil diff --git a/client/internal/publock/reqbuilder/lock_request_builder.go b/client/internal/publock/reqbuilder/lock_request_builder.go index 327b311..927bcbb 100644 --- a/client/internal/publock/reqbuilder/lock_request_builder.go +++ b/client/internal/publock/reqbuilder/lock_request_builder.go @@ -1,29 +1,14 @@ package reqbuilder import ( - "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" ) -type LockRequestBuilder struct { - locks []types.Lock +type LockRequestBuilder[T any] struct { + Locks []types.Lock + T T } -func NewBuilder() *LockRequestBuilder { - return &LockRequestBuilder{} -} - -func (b *LockRequestBuilder) IsEmpty() bool { - return len(b.locks) == 0 -} - -func (b *LockRequestBuilder) Build() types.LockRequest { - return types.LockRequest{ - Locks: lo2.ArrayClone(b.locks), - } -} - -func (b *LockRequestBuilder) MutexLock(svc *publock.Service, opt ...publock.AcquireOptionFn) (*publock.Mutex, error) { - return svc.Acquire(b.Build(), opt...) +func (b *LockRequestBuilder[T]) End() T { + return b.T } diff --git a/client/internal/publock/reqbuilder/package.go b/client/internal/publock/reqbuilder/package.go index 8a17e18..68ae9c7 100644 --- a/client/internal/publock/reqbuilder/package.go +++ b/client/internal/publock/reqbuilder/package.go @@ -8,15 +8,15 @@ import ( jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) -type PackageLockReqBuilder struct { - *LockRequestBuilder +type PackageLockReqBuilder[T any] struct { + *LockRequestBuilder[T] } -func (b *LockRequestBuilder) Package() *PackageLockReqBuilder { - return &PackageLockReqBuilder{LockRequestBuilder: b} +func (b *LockRequestBuilder[T]) Package() *PackageLockReqBuilder[T] { + return &PackageLockReqBuilder[T]{LockRequestBuilder: b} } -func (b *PackageLockReqBuilder) Buzy(pkgID jcstypes.PackageID) *PackageLockReqBuilder { - b.locks = append(b.locks, types.Lock{ +func (b *PackageLockReqBuilder[T]) Buzy(pkgID jcstypes.PackageID) *PackageLockReqBuilder[T] { + b.Locks = append(b.Locks, types.Lock{ Path: b.makePath(pkgID), Name: lockprovider.PackageBuzyLock, Target: lockprovider.NewEmptyTarget(), @@ -24,8 +24,8 @@ func (b *PackageLockReqBuilder) Buzy(pkgID jcstypes.PackageID) *PackageLockReqBu return b } -func (b *PackageLockReqBuilder) Pin(pkgID jcstypes.PackageID) *PackageLockReqBuilder { - b.locks = append(b.locks, types.Lock{ +func (b *PackageLockReqBuilder[T]) Pin(pkgID jcstypes.PackageID) *PackageLockReqBuilder[T] { + b.Locks = append(b.Locks, types.Lock{ Path: b.makePath(pkgID), Name: lockprovider.PackagePinLock, Target: lockprovider.NewEmptyTarget(), @@ -33,6 +33,6 @@ func (b *PackageLockReqBuilder) Pin(pkgID jcstypes.PackageID) *PackageLockReqBui return b } -func (b *PackageLockReqBuilder) makePath(pkgID jcstypes.PackageID) []string { +func (b *PackageLockReqBuilder[T]) makePath(pkgID jcstypes.PackageID) []string { return []string{lockprovider.PackageLockPathPrefix, strconv.FormatInt(int64(pkgID), 10)} } diff --git a/client/internal/publock/reqbuilder/user_space.go b/client/internal/publock/reqbuilder/user_space.go index 5ddf550..5a51323 100644 --- a/client/internal/publock/reqbuilder/user_space.go +++ b/client/internal/publock/reqbuilder/user_space.go @@ -8,15 +8,15 @@ import ( jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) -type UserSpaceLockReqBuilder struct { - *LockRequestBuilder +type UserSpaceLockReqBuilder[T any] struct { + *LockRequestBuilder[T] } -func (b *LockRequestBuilder) UserSpace() *UserSpaceLockReqBuilder { - return &UserSpaceLockReqBuilder{LockRequestBuilder: b} +func (b *LockRequestBuilder[T]) UserSpace() *UserSpaceLockReqBuilder[T] { + return &UserSpaceLockReqBuilder[T]{LockRequestBuilder: b} } -func (b *UserSpaceLockReqBuilder) Buzy(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder { - b.locks = append(b.locks, types.Lock{ +func (b *UserSpaceLockReqBuilder[T]) Buzy(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder[T] { + b.Locks = append(b.Locks, types.Lock{ Path: b.makePath(spaceID), Name: lockprovider.UserSpaceBuzyLock, Target: lockprovider.NewEmptyTarget(), @@ -24,8 +24,8 @@ func (b *UserSpaceLockReqBuilder) Buzy(spaceID jcstypes.UserSpaceID) *UserSpaceL return b } -func (b *UserSpaceLockReqBuilder) GC(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder { - b.locks = append(b.locks, types.Lock{ +func (b *UserSpaceLockReqBuilder[T]) GC(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder[T] { + b.Locks = append(b.Locks, types.Lock{ Path: b.makePath(spaceID), Name: lockprovider.UserSpaceGCLock, Target: lockprovider.NewEmptyTarget(), @@ -33,6 +33,6 @@ func (b *UserSpaceLockReqBuilder) GC(spaceID jcstypes.UserSpaceID) *UserSpaceLoc return b } -func (b *UserSpaceLockReqBuilder) makePath(hubID jcstypes.UserSpaceID) []string { +func (b *UserSpaceLockReqBuilder[T]) makePath(hubID jcstypes.UserSpaceID) []string { return []string{lockprovider.UserSpaceLockPathPrefix, strconv.FormatInt(int64(hubID), 10)} } diff --git a/client/internal/publock/service.go b/client/internal/publock/service.go index 76ee8dc..79c27c9 100644 --- a/client/internal/publock/service.go +++ b/client/internal/publock/service.go @@ -7,14 +7,14 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/trie" - "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/lockprovider" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" ) type AcquireOption struct { Timeout time.Duration + Reason string } type AcquireOptionFn func(opt *AcquireOption) @@ -25,171 +25,214 @@ func WithTimeout(timeout time.Duration) AcquireOptionFn { } } -type Service struct { - lock *sync.Mutex - provdersTrie *trie.Trie[types.LockProvider] - acquirings []*acquireInfo - nextReqID int64 +func WithReason(reason string) AcquireOptionFn { + return func(opt *AcquireOption) { + opt.Reason = reason + } } -func NewService() *Service { - svc := &Service{ - lock: &sync.Mutex{}, - provdersTrie: trie.NewTrie[types.LockProvider](), - } +type PubLock struct { + core *Core + cliCli *clirpc.Client + pubChan clirpc.PubLockMessageChan + lock sync.Mutex + acquirings map[string]*acquireInfo + releasing map[string]*releaseInfo + nextCtxID int64 +} - svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock() - svc.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock() - return svc +func NewMaster() *PubLock { + core := NewCore() + return &PubLock{ + core: core, + } } -type acquireInfo struct { - Request types.LockRequest - Callback *future.SetValueFuture[types.RequestID] - LastErr error +func NewSlave(cli *clirpc.Client) *PubLock { + return &PubLock{ + cliCli: cli, + acquirings: make(map[string]*acquireInfo), + releasing: make(map[string]*releaseInfo), + } } -func (svc *Service) Acquire(req types.LockRequest, opts ...AcquireOptionFn) (*Mutex, error) { - var opt = AcquireOption{ - Timeout: time.Second * 10, +func (s *PubLock) BeginReentrant() *Reentrant { + r := &Reentrant{ + p: s, } - for _, fn := range opts { - fn(&opt) + r.T = r + return r +} + +func (p *PubLock) BeginMutex() *MutexBuilder { + m := &MutexBuilder{ + pub: p, } + m.T = m + return m +} - ctx := context.Background() - if opt.Timeout != 0 { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, opt.Timeout) - defer cancel() +func (p *PubLock) Start() { + if p.core != nil { + p.core.Start() + return } +} - // 就地检测锁是否可用 - svc.lock.Lock() - defer svc.lock.Unlock() +func (p *PubLock) Stop() { + p.lock.Lock() + defer p.lock.Unlock() - reqID, err := svc.tryAcquireOne(req) - if err != nil { - return nil, err + if p.core != nil { + p.core.Stop() + return } - if reqID != "" { - return &Mutex{ - svc: svc, - lockReq: req, - lockReqID: reqID, - }, nil + if p.pubChan != nil { + p.pubChan.Close() + p.pubChan = nil } - // 就地检测失败,那么就需要异步等待锁可用 - info := &acquireInfo{ - Request: req, - Callback: future.NewSetValue[types.RequestID](), - } - svc.acquirings = append(svc.acquirings, info) + p.cliCli.Release() + p.cliCli = nil +} - // 等待的时候不加锁 - svc.lock.Unlock() - reqID, err = info.Callback.Wait(ctx) - svc.lock.Lock() +func (p *PubLock) acquire(req types.LockRequest, opts ...AcquireOptionFn) (LockedRequest, error) { + p.lock.Lock() - if err == nil { - return &Mutex{ - svc: svc, - lockReq: req, - lockReqID: reqID, - }, nil + if p.core != nil { + p.lock.Unlock() + return p.core.Acquire(req, opts...) } - if err != future.ErrCanceled { - lo2.Remove(svc.acquirings, info) - return nil, err + if p.pubChan == nil { + p.pubChan = p.cliCli.PubLockChannel(context.Background()) + go p.receivingChan() } - // 如果第一次等待是超时错误,那么在锁里再尝试获取一次结果 - reqID, err = info.Callback.TryGetValue() - if err == nil { - return &Mutex{ - svc: svc, - lockReq: req, - lockReqID: reqID, - }, nil + acqID := fmt.Sprintf("%v", p.nextCtxID) + p.nextCtxID++ + + cerr := p.pubChan.Send(&types.AcquireMsg{ContextID: acqID, Request: req}) + if cerr != nil { + p.lock.Unlock() + return LockedRequest{}, cerr.ToError() } - lo2.Remove(svc.acquirings, info) - return nil, err -} + callback := future.NewSetValue[types.RequestID]() + info := &acquireInfo{ + Request: req, + Callback: callback, + } + p.acquirings[acqID] = info + p.lock.Unlock() -func (s *Service) BeginReentrant() *Reentrant { - return &Reentrant{ - svc: s, + reqID, err := callback.Wait(context.Background()) + if err != nil { + return LockedRequest{}, err } + + return LockedRequest{ + Req: req, + ReqID: reqID, + }, nil } -func (s *Service) release(reqID types.RequestID, req types.LockRequest) { - s.lock.Lock() - defer s.lock.Unlock() +func (p *PubLock) release(reqID types.RequestID) { + log := logger.WithField("Mod", "PubLock") - s.releaseRequest(reqID, req) - s.tryAcquirings() -} + p.lock.Lock() -func (a *Service) tryAcquirings() { - for i := 0; i < len(a.acquirings); i++ { - req := a.acquirings[i] + if p.core != nil { + p.lock.Unlock() + p.core.release(reqID) + return + } - reqID, err := a.tryAcquireOne(req.Request) - if err != nil { - req.LastErr = err - continue - } + if p.pubChan == nil { + p.pubChan = p.cliCli.PubLockChannel(context.Background()) + go p.receivingChan() + } - req.Callback.SetValue(reqID) - a.acquirings[i] = nil + relID := fmt.Sprintf("%v", p.nextCtxID) + p.nextCtxID++ + + cerr := p.pubChan.Send(&types.ReleaseMsg{ContextID: relID, RequestID: reqID}) + if cerr != nil { + p.lock.Unlock() + log.Warnf("unlock %v: %v", reqID, cerr.ToError()) + return } - a.acquirings = lo2.RemoveAllDefault(a.acquirings) -} + callback := future.NewSetVoid() + info := &releaseInfo{ + RequestID: reqID, + Callback: callback, + } + p.releasing[relID] = info + p.lock.Unlock() -func (s *Service) tryAcquireOne(req types.LockRequest) (types.RequestID, error) { - err := s.testOneRequest(req) + err := callback.Wait(context.Background()) if err != nil { - return "", err + log.Warnf("unlock %v: %v", reqID, err) + return } - reqID := types.RequestID(fmt.Sprintf("%d", s.nextReqID)) - s.nextReqID++ - - s.applyRequest(reqID, req) - return reqID, nil + log.Tracef("unlock %v", reqID) } -func (s *Service) testOneRequest(req types.LockRequest) error { - for _, lock := range req.Locks { - n, ok := s.provdersTrie.WalkEnd(lock.Path) - if !ok || n.Value == nil { - return fmt.Errorf("lock provider not found for path %v", lock.Path) +func (p *PubLock) receivingChan() { + log := logger.WithField("Mod", "PubLock") + for { + msg, cerr := p.pubChan.Receive() + if cerr != nil { + p.lock.Lock() + for _, info := range p.acquirings { + info.Callback.SetError(cerr.ToError()) + } + p.acquirings = make(map[string]*acquireInfo) + + for _, info := range p.releasing { + info.Callback.SetError(cerr.ToError()) + } + p.releasing = make(map[string]*releaseInfo) + p.lock.Unlock() + + log.Warnf("receive channel error: %v", cerr.ToError()) + return } - err := n.Value.CanLock(lock) - if err != nil { - return err + p.lock.Lock() + + switch msg := msg.(type) { + case *types.AcquireResultMsg: + info, ok := p.acquirings[msg.ContextID] + if !ok { + continue + } + + if msg.Success { + info.Callback.SetValue(msg.RequestID) + } else { + info.Callback.SetError(fmt.Errorf(msg.Reason)) + } + delete(p.acquirings, msg.ContextID) + + case *types.ReleaseResultMsg: + info, ok := p.releasing[msg.ContextID] + if !ok { + continue + } + + info.Callback.SetVoid() + delete(p.releasing, msg.ContextID) } - } - - return nil -} -func (s *Service) applyRequest(reqID types.RequestID, req types.LockRequest) { - for _, lock := range req.Locks { - p, _ := s.provdersTrie.WalkEnd(lock.Path) - p.Value.Lock(reqID, lock) + p.lock.Unlock() } } -func (s *Service) releaseRequest(reqID types.RequestID, req types.LockRequest) { - for _, lock := range req.Locks { - p, _ := s.provdersTrie.WalkEnd(lock.Path) - p.Value.Unlock(reqID, lock) - } +type releaseInfo struct { + RequestID types.RequestID + Callback *future.SetVoidFuture } diff --git a/client/internal/publock/types/channel.go b/client/internal/publock/types/channel.go new file mode 100644 index 0000000..dc53bfd --- /dev/null +++ b/client/internal/publock/types/channel.go @@ -0,0 +1,30 @@ +package types + +type AcquireMsg struct { + ContextID string + Request LockRequest +} + +func (*AcquireMsg) IsPubLockMessage() bool { return true } + +type AcquireResultMsg struct { + ContextID string + Success bool + Reason string + RequestID RequestID +} + +func (*AcquireResultMsg) IsPubLockMessage() bool { return true } + +type ReleaseMsg struct { + ContextID string + RequestID RequestID +} + +func (*ReleaseMsg) IsPubLockMessage() bool { return true } + +type ReleaseResultMsg struct { + ContextID string +} + +func (*ReleaseResultMsg) IsPubLockMessage() bool { return true } diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 6917451..485a228 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" @@ -682,7 +681,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID jcstypes.ObjectID, in objBlkMap[blk.Index] = blk } - lockBld := reqbuilder.NewBuilder() + lockBld := svc.PubLock.BeginMutex() var compBlks []jcstypes.ObjectBlock var compBlkSpaces []jcstypes.UserSpaceDetail @@ -706,7 +705,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID jcstypes.ObjectID, in lockBld.UserSpace().Buzy(stg.UserSpace.UserSpaceID) } - mutex, err := lockBld.MutexLock(svc.PubLock) + mutex, err := lockBld.Lock() if err != nil { return jcstypes.Object{}, fmt.Errorf("acquire lock: %w", err) } diff --git a/client/internal/services/service.go b/client/internal/services/service.go index d0707c6..17d9350 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -19,7 +19,7 @@ import ( // Service 结构体封装了分布锁服务和任务管理服务。 type Service struct { - PubLock *publock.Service + PubLock *publock.PubLock Downloader *downloader.Downloader AccessStat *accessstat.AccessStat Uploader *uploader.Uploader @@ -36,7 +36,7 @@ type Service struct { } func NewService( - publock *publock.Service, + publock *publock.PubLock, downloader *downloader.Downloader, accStat *accessstat.AccessStat, uploder *uploader.Uploader, diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 06338af..8be221d 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -13,7 +13,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" @@ -200,9 +199,9 @@ func (svc *UserSpaceService) DownloadPackage(packageID jcstypes.PackageID, users return err } - mutex, err := reqbuilder.NewBuilder(). - UserSpace().Buzy(userspaceID). - MutexLock(svc.PubLock) + mutex, err := svc.PubLock.BeginMutex(). + UserSpace().Buzy(userspaceID).End(). + Lock() if err != nil { return fmt.Errorf("acquire locks failed, err: %w", err) } diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index 1193b3c..bc7556e 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -8,7 +8,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" "gitlink.org.cn/cloudream/jcs-pub/common/types/datamap" ) @@ -79,7 +78,7 @@ loop: break loop } - lock, err := reqbuilder.NewBuilder().Package().Buzy(id).MutexLock(t.pubLock, publock.WithTimeout(time.Second*10)) + lock, err := t.pubLock.BeginMutex().Package().Buzy(id).End().Lock(publock.WithTimeout(time.Second * 10)) if err != nil { log.Warnf("lock package: %v", err) continue @@ -169,11 +168,10 @@ func (j *ChangeRedundancy) changeOne(ctx *changeRedundancyContext, pkg jcstypes. continue } - reqBlder := reqbuilder.NewBuilder() for _, space := range selectedSpaces { - reqBlder.UserSpace().Buzy(space.UserSpace.UserSpace.UserSpaceID) + reen.UserSpace().Buzy(space.UserSpace.UserSpace.UserSpaceID) } - err := reen.Lock(reqBlder.Build()) + err := reen.Lock() if err != nil { log.WithField("ObjectID", obj.Object.ObjectID).Warnf("acquire lock: %s", err.Error()) continue diff --git a/client/internal/ticktock/redundancy_shrink.go b/client/internal/ticktock/redundancy_shrink.go index 9299c04..ca8518a 100644 --- a/client/internal/ticktock/redundancy_shrink.go +++ b/client/internal/ticktock/redundancy_shrink.go @@ -15,7 +15,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/consts" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" @@ -918,11 +917,10 @@ func (t *ChangeRedundancy) generateSysEventForECObject(solu annealingSolution, o } func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[jcstypes.UserSpaceID]bool, reen *publock.Reentrant) (exec.PlanResult, error) { - reqBlder := reqbuilder.NewBuilder() for id, _ := range planningSpaceIDs { - reqBlder.UserSpace().Buzy(id) + reen.UserSpace().Buzy(id) } - err := reen.Lock(reqBlder.Build()) + err := reen.Lock() if err != nil { return exec.PlanResult{}, fmt.Errorf("locking shard resources: %w", err) } diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index c15f52e..a081432 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -32,11 +32,11 @@ type TickTock struct { spaceMeta *metacache.UserSpaceMeta stgPool *pool.Pool evtPub *sysevent.Publisher - pubLock *publock.Service + pubLock *publock.PubLock speedStats *speedstats.SpeedStats } -func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.Service, speedStats *speedstats.SpeedStats) *TickTock { +func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.PubLock, speedStats *speedstats.SpeedStats) *TickTock { sch, _ := gocron.NewScheduler() t := &TickTock{ cfg: cfg, diff --git a/client/internal/ticktock/user_space_gc.go b/client/internal/ticktock/user_space_gc.go index 1c879fb..330b369 100644 --- a/client/internal/ticktock/user_space_gc.go +++ b/client/internal/ticktock/user_space_gc.go @@ -8,8 +8,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" - - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" ) type UserSpaceGC struct { @@ -47,7 +45,7 @@ func (j *UserSpaceGC) Execute(t *TickTock) { func (j *UserSpaceGC) gcOne(t *TickTock, space *jcstypes.UserSpaceDetail) { log := logger.WithType[UserSpaceGC]("Event") - mutex, err := reqbuilder.NewBuilder().UserSpace().GC(space.UserSpace.UserSpaceID).MutexLock(t.pubLock) + mutex, err := t.pubLock.BeginMutex().UserSpace().GC(space.UserSpace.UserSpaceID).End().Lock() if err != nil { log.Warnf("acquire lock: %v", err) return diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 2577e77..67eca83 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -14,7 +14,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" @@ -26,14 +25,14 @@ import ( ) type Uploader struct { - pubLock *publock.Service + pubLock *publock.PubLock connectivity *connectivity.Collector stgPool *pool.Pool spaceMeta *metacache.UserSpaceMeta db *db.DB } -func NewUploader(pubLock *publock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { +func NewUploader(pubLock *publock.PubLock, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { return &Uploader{ pubLock: pubLock, connectivity: connectivity, @@ -99,7 +98,7 @@ func (u *Uploader) BeginUpdate(pkgID jcstypes.PackageID, affinity jcstypes.UserS target := u.chooseUploadStorage(uploadSpaces, affinity) // 防止上传的副本被清除 - pubLock, err := reqbuilder.NewBuilder().UserSpace().Buzy(target.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + pubLock, err := u.pubLock.BeginMutex().UserSpace().Buzy(target.Space.UserSpace.UserSpaceID).End().Lock() if err != nil { return nil, fmt.Errorf("acquire lock: %w", err) } @@ -160,11 +159,11 @@ func (u *Uploader) BeginCreateUpload(bktID jcstypes.BucketID, pkgName string, co return nil, fmt.Errorf("create package: %w", err) } - reqBld := reqbuilder.NewBuilder() + reqBld := u.pubLock.BeginMutex() for _, stg := range spacesStgs { reqBld.UserSpace().Buzy(stg.UserSpace.UserSpaceID) } - lock, err := reqBld.MutexLock(u.pubLock) + lock, err := reqBld.Lock() if err != nil { return nil, fmt.Errorf("acquire lock: %w", err) } @@ -244,7 +243,7 @@ func (u *Uploader) UploadPart(objID jcstypes.ObjectID, index int, stream io.Read space = u.chooseUploadStorage(userStgs, 0).Space } - lock, err := reqbuilder.NewBuilder().UserSpace().Buzy(space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + lock, err := u.pubLock.BeginMutex().UserSpace().Buzy(space.UserSpace.UserSpaceID).End().Lock() if err != nil { return fmt.Errorf("acquire lock: %w", err) } diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index bb6bae8..fa92f9e 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -9,7 +9,6 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" @@ -105,7 +104,7 @@ func (u *Uploader) UserSpaceUpload(userSpaceID jcstypes.UserSpaceID, rootPath jc return nil, fmt.Errorf("getting base store: %w", err) } - mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + mutex, err := u.pubLock.BeginMutex().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).End().Lock() if err != nil { delPkg() return nil, fmt.Errorf("acquire lock: %w", err) diff --git a/common/pkgs/rpc/channel.go b/common/pkgs/rpc/channel.go index f4ac0c7..2424c92 100644 --- a/common/pkgs/rpc/channel.go +++ b/common/pkgs/rpc/channel.go @@ -1,6 +1,8 @@ package rpc import ( + "sync" + "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" ) @@ -50,6 +52,7 @@ type bidChanClient[Recv, Send any] struct { cli BidChannelAPIClient cancelFn func() lastErr *CodeError + lock sync.Mutex } func NewBidChanClient[Recv, Send any](cli BidChannelAPIClient, cancelFn func()) BidChan[Recv, Send] { @@ -57,8 +60,12 @@ func NewBidChanClient[Recv, Send any](cli BidChannelAPIClient, cancelFn func()) } func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError { + c.lock.Lock() + defer c.lock.Unlock() + if c.lastErr != nil { - return c.lastErr + err := c.lastErr + return err } data, err := serder.ObjectToJSONEx(val) @@ -67,11 +74,17 @@ func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError { c.lastErr = Failed(ecode.OperationFailed, err.Error()) return Failed(ecode.OperationFailed, err.Error()) } + c.lock.Unlock() err = c.cli.Send(&Request{Payload: data}) + + c.lock.Lock() + if err != nil { c.cancelFn() - c.lastErr = getCodeError(err) + if c.lastErr == nil { + c.lastErr = getCodeError(err) + } return c.lastErr } @@ -79,31 +92,50 @@ func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError { } func (c *bidChanClient[Recv, Send]) Receive() (Recv, *CodeError) { + c.lock.Lock() + defer c.lock.Unlock() + if c.lastErr != nil { var def Recv return def, c.lastErr } + c.lock.Unlock() resp, err := c.cli.Recv() + c.lock.Lock() + if err != nil { c.cancelFn() - c.lastErr = getCodeError(err) + + cerr := getCodeError(err) + if c.lastErr == nil { + c.lastErr = cerr + } + var def Recv - return def, c.lastErr + return def, cerr } resp2, err := serder.JSONToObjectEx[Recv](resp.Payload) if err != nil { c.cancelFn() - c.lastErr = Failed(ecode.OperationFailed, err.Error()) + + cerr := Failed(ecode.OperationFailed, err.Error()) + if c.lastErr == nil { + c.lastErr = cerr + } + var def Recv - return def, c.lastErr + return def, cerr } return resp2, nil } func (c *bidChanClient[Recv, Send]) Close() { + c.lock.Lock() + defer c.lock.Unlock() + if c.lastErr != nil { return } @@ -113,6 +145,9 @@ func (c *bidChanClient[Recv, Send]) Close() { } func (c *bidChanClient[Recv, Send]) CloseWithError(err *CodeError) { + c.lock.Lock() + defer c.lock.Unlock() + if c.lastErr != nil { return } @@ -125,6 +160,7 @@ type bidChanServer[Recv, Send any] struct { svr BidChannelAPIServer errChan chan *CodeError lastErr *CodeError + lock sync.Mutex } func NewBidChanServer[Recv, Send any](svr BidChannelAPIServer, errChan chan *CodeError) BidChan[Recv, Send] { @@ -132,6 +168,9 @@ func NewBidChanServer[Recv, Send any](svr BidChannelAPIServer, errChan chan *Cod } func (s *bidChanServer[Recv, Send]) Send(val Send) *CodeError { + s.lock.Lock() + defer s.lock.Unlock() + if s.lastErr != nil { return s.lastErr } @@ -140,43 +179,67 @@ func (s *bidChanServer[Recv, Send]) Send(val Send) *CodeError { if err != nil { s.lastErr = Failed(ecode.OperationFailed, err.Error()) s.errChan <- s.lastErr - return Failed(ecode.OperationFailed, err.Error()) + return s.lastErr } + + s.lock.Unlock() err = s.svr.Send(&Response{Payload: data}) + s.lock.Lock() + if err != nil { - s.lastErr = getCodeError(err) - s.errChan <- s.lastErr - return s.lastErr + cerr := getCodeError(err) + if s.lastErr == nil { + s.lastErr = cerr + s.errChan <- cerr + } + return cerr } return nil } func (s *bidChanServer[Recv, Send]) Receive() (Recv, *CodeError) { + s.lock.Lock() + defer s.lock.Unlock() + if s.lastErr != nil { var def Recv return def, s.lastErr } + s.lock.Unlock() req, err := s.svr.Recv() + s.lock.Lock() + if err != nil { - s.lastErr = getCodeError(err) - s.errChan <- s.lastErr + cerr := getCodeError(err) + if s.lastErr == nil { + s.lastErr = cerr + s.errChan <- cerr + } + var def Recv - return def, s.lastErr + return def, cerr } req2, err := serder.JSONToObjectEx[Recv](req.Payload) if err != nil { - s.lastErr = Failed(ecode.OperationFailed, err.Error()) - s.errChan <- s.lastErr + cerr := Failed(ecode.OperationFailed, err.Error()) + if s.lastErr == nil { + s.lastErr = cerr + s.errChan <- cerr + } + var def Recv - return def, s.lastErr + return def, cerr } return req2, nil } func (s *bidChanServer[Recv, Send]) Close() { + s.lock.Lock() + defer s.lock.Unlock() + if s.lastErr != nil { return } @@ -186,6 +249,9 @@ func (s *bidChanServer[Recv, Send]) Close() { } func (s *bidChanServer[Recv, Send]) CloseWithError(err *CodeError) { + s.lock.Lock() + defer s.lock.Unlock() + if s.lastErr != nil { return }