Browse Source

PubLock支持主从模式

master
Sydonian 3 months ago
parent
commit
444c3bb738
24 changed files with 567 additions and 229 deletions
  1. +3
    -1
      client/internal/cmdline/serve.go
  2. +3
    -1
      client/internal/cmdline/test.go
  3. +3
    -1
      client/internal/cmdline/vfstest.go
  4. +1
    -1
      client/internal/downloader/iterator.go
  5. +1
    -2
      client/internal/http/v1/package.go
  6. +1
    -2
      client/internal/http/v1/pub_shards.go
  7. +197
    -0
      client/internal/publock/core.go
  8. +23
    -4
      client/internal/publock/mutex.go
  9. +13
    -9
      client/internal/publock/reentrant.go
  10. +5
    -20
      client/internal/publock/reqbuilder/lock_request_builder.go
  11. +9
    -9
      client/internal/publock/reqbuilder/package.go
  12. +9
    -9
      client/internal/publock/reqbuilder/user_space.go
  13. +165
    -122
      client/internal/publock/service.go
  14. +30
    -0
      client/internal/publock/types/channel.go
  15. +2
    -3
      client/internal/services/object.go
  16. +2
    -2
      client/internal/services/service.go
  17. +3
    -4
      client/internal/services/user_space.go
  18. +3
    -5
      client/internal/ticktock/change_redundancy.go
  19. +2
    -4
      client/internal/ticktock/redundancy_shrink.go
  20. +2
    -2
      client/internal/ticktock/ticktock.go
  21. +1
    -3
      client/internal/ticktock/user_space_gc.go
  22. +6
    -7
      client/internal/uploader/uploader.go
  23. +1
    -2
      client/internal/uploader/user_space_upload.go
  24. +82
    -16
      common/pkgs/rpc/channel.go

+ 3
- 1
client/internal/cmdline/serve.go View File

@@ -164,7 +164,9 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
conCol.CollectInPlace()

// 公共锁
publock := publock.NewService()
publock := publock.NewMaster()
publock.Start()
defer publock.Stop()

// 访问统计
acStat := accessstat.NewAccessStat(accessstat.Config{


+ 3
- 1
client/internal/cmdline/test.go View File

@@ -175,7 +175,9 @@ func test(configPath string) {
conCol.CollectInPlace()

// 公共锁
publock := publock.NewService()
publock := publock.NewMaster()
publock.Start()
defer publock.Stop()

// 访问统计
acStat := accessstat.NewAccessStat(accessstat.Config{


+ 3
- 1
client/internal/cmdline/vfstest.go View File

@@ -155,7 +155,9 @@ func vfsTest(configPath string, opts serveHTTPOptions) {
conCol.CollectInPlace()

// 公共锁
publock := publock.NewService()
publock := publock.NewMaster()
publock.Start()
defer publock.Stop()

// 访问统计
acStat := accessstat.NewAccessStat(accessstat.Config{


+ 1
- 1
client/internal/downloader/iterator.go View File

@@ -29,7 +29,7 @@ type downloadSpaceInfo struct {
}

type DownloadContext struct {
PubLock *publock.Service
PubLock *publock.PubLock
}
type DownloadObjectIterator struct {
OnClosing func()


+ 1
- 2
client/internal/http/v1/package.go View File

@@ -19,7 +19,6 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator"
@@ -400,7 +399,7 @@ func (s *PackageService) Pin(ctx *gin.Context) {
return
}

lock, err := reqbuilder.NewBuilder().Package().Pin(req.PackageID).MutexLock(s.svc.PubLock)
lock, err := s.svc.PubLock.BeginMutex().Package().Pin(req.PackageID).End().Lock()
if err != nil {
ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "lock package: %v", err))
return


+ 1
- 2
client/internal/http/v1/pub_shards.go View File

@@ -16,7 +16,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
@@ -228,7 +227,7 @@ func (s *PubShardsService) ExportPackage(ctx *gin.Context) {
return
}

lock, err := reqbuilder.NewBuilder().Package().Pin(req.PackageID).MutexLock(s.svc.PubLock)
lock, err := s.svc.PubLock.BeginMutex().Package().Pin(req.PackageID).End().Lock()
if err != nil {
ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "lock package: %v", err))
return


+ 197
- 0
client/internal/publock/core.go View File

@@ -0,0 +1,197 @@
package publock

import (
"context"
"fmt"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/trie"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/lockprovider"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
)

type Core struct {
lock *sync.Mutex
provdersTrie *trie.Trie[types.LockProvider]
acquirings []*acquireInfo
acquired map[types.RequestID]types.LockRequest
nextReqID int64
}

func NewCore() *Core {
svc := &Core{
lock: &sync.Mutex{},
provdersTrie: trie.NewTrie[types.LockProvider](),
acquired: make(map[types.RequestID]types.LockRequest),
}

svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock()
svc.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock()
return svc
}

func (s *Core) Start() {

}

func (s *Core) Stop() {

}

type acquireInfo struct {
Request types.LockRequest
Callback *future.SetValueFuture[types.RequestID]
LastErr error
}

func (svc *Core) Acquire(req types.LockRequest, opts ...AcquireOptionFn) (LockedRequest, error) {
var opt = AcquireOption{
Timeout: time.Second * 10,
}
for _, fn := range opts {
fn(&opt)
}

ctx := context.Background()
if opt.Timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, opt.Timeout)
defer cancel()
}

// 就地检测锁是否可用
svc.lock.Lock()
defer svc.lock.Unlock()

reqID, err := svc.tryAcquireOne(req)
if err != nil {
return LockedRequest{}, err
}

if reqID != "" {
svc.acquired[reqID] = req
return LockedRequest{
Req: req,
ReqID: reqID,
}, nil
}

// 就地检测失败,那么就需要异步等待锁可用
info := &acquireInfo{
Request: req,
Callback: future.NewSetValue[types.RequestID](),
}
svc.acquirings = append(svc.acquirings, info)

// 等待的时候不加锁
svc.lock.Unlock()
reqID, err = info.Callback.Wait(ctx)
svc.lock.Lock()

if err == nil {
svc.acquired[reqID] = req
return LockedRequest{
Req: req,
ReqID: reqID,
}, nil
}

if err != future.ErrCanceled {
lo2.Remove(svc.acquirings, info)
return LockedRequest{}, err
}

// 如果第一次等待是超时错误,那么在锁里再尝试获取一次结果
reqID, err = info.Callback.TryGetValue()
if err == nil {
svc.acquired[reqID] = req
return LockedRequest{
Req: req,
ReqID: reqID,
}, nil
}

lo2.Remove(svc.acquirings, info)
return LockedRequest{}, err
}

func (s *Core) release(reqID types.RequestID) {
s.lock.Lock()
defer s.lock.Unlock()

req, ok := s.acquired[reqID]
if !ok {
return
}

s.releaseRequest(reqID, req)
s.tryAcquirings()
}

func (a *Core) tryAcquirings() {
for i := 0; i < len(a.acquirings); i++ {
req := a.acquirings[i]

reqID, err := a.tryAcquireOne(req.Request)
if err != nil {
req.LastErr = err
continue
}

req.Callback.SetValue(reqID)
a.acquirings[i] = nil
}

a.acquirings = lo2.RemoveAllDefault(a.acquirings)
}

func (s *Core) tryAcquireOne(req types.LockRequest) (types.RequestID, error) {
err := s.testOneRequest(req)
if err != nil {
return "", err
}

reqID := types.RequestID(fmt.Sprintf("%d", s.nextReqID))
s.nextReqID++

s.applyRequest(reqID, req)
return reqID, nil
}

func (s *Core) testOneRequest(req types.LockRequest) error {
for _, lock := range req.Locks {
n, ok := s.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lock.Path)
}

err := n.Value.CanLock(lock)
if err != nil {
return err
}
}

return nil
}

func (s *Core) applyRequest(reqID types.RequestID, req types.LockRequest) {
for _, lock := range req.Locks {
p, _ := s.provdersTrie.WalkEnd(lock.Path)
p.Value.Lock(reqID, lock)
}
}

func (s *Core) releaseRequest(reqID types.RequestID, req types.LockRequest) {
for _, lock := range req.Locks {
p, _ := s.provdersTrie.WalkEnd(lock.Path)
p.Value.Unlock(reqID, lock)
}
}

type LockedRequest struct {
Req types.LockRequest
ReqID types.RequestID
}

+ 23
- 4
client/internal/publock/mutex.go View File

@@ -1,15 +1,34 @@
package publock

import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
)

type Mutex struct {
svc *Service
lockReq types.LockRequest
lockReqID types.RequestID
pub *PubLock
locked LockedRequest
}

func (m *Mutex) Unlock() {
m.svc.release(m.lockReqID, m.lockReq)
m.pub.release(m.locked.ReqID)
}

type MutexBuilder struct {
reqbuilder.LockRequestBuilder[*MutexBuilder]
pub *PubLock
}

func (b *MutexBuilder) Lock(opt ...AcquireOptionFn) (*Mutex, error) {
lkd, err := b.pub.acquire(types.LockRequest{
Locks: b.Locks,
}, opt...)
if err != nil {
return nil, err
}

return &Mutex{
pub: b.pub,
locked: lkd,
}, nil
}

+ 13
- 9
client/internal/publock/reentrant.go View File

@@ -1,18 +1,22 @@
package publock

import "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
)

type Reentrant struct {
svc *Service
reqbuilder.LockRequestBuilder[*Reentrant]
p *PubLock
reqs []types.LockRequest
locked []*Mutex
locked []LockedRequest
}

func (r *Reentrant) Lock(req types.LockRequest, opt ...AcquireOptionFn) error {
func (r *Reentrant) Lock(opt ...AcquireOptionFn) error {
var willLock []types.Lock

loop:
for _, lock := range req.Locks {
for _, lock := range r.LockRequestBuilder.Locks {
for _, req := range r.reqs {
for _, locked := range req.Locks {
if locked.Equals(lock) {
@@ -23,17 +27,17 @@ loop:

willLock = append(willLock, lock)
}
r.LockRequestBuilder.Locks = nil

if len(willLock) == 0 {
return nil
}

newReq := types.LockRequest{
Reason: req.Reason,
Locks: willLock,
Locks: willLock,
}

m, err := r.svc.Acquire(newReq, opt...)
m, err := r.p.acquire(newReq, opt...)
if err != nil {
return err
}
@@ -46,7 +50,7 @@ loop:

func (r *Reentrant) Unlock() {
for i := len(r.reqs) - 1; i >= 0; i-- {
r.locked[i].Unlock()
r.p.release(r.locked[i].ReqID)
}
r.locked = nil
r.reqs = nil


+ 5
- 20
client/internal/publock/reqbuilder/lock_request_builder.go View File

@@ -1,29 +1,14 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
)

type LockRequestBuilder struct {
locks []types.Lock
type LockRequestBuilder[T any] struct {
Locks []types.Lock
T T
}

func NewBuilder() *LockRequestBuilder {
return &LockRequestBuilder{}
}

func (b *LockRequestBuilder) IsEmpty() bool {
return len(b.locks) == 0
}

func (b *LockRequestBuilder) Build() types.LockRequest {
return types.LockRequest{
Locks: lo2.ArrayClone(b.locks),
}
}

func (b *LockRequestBuilder) MutexLock(svc *publock.Service, opt ...publock.AcquireOptionFn) (*publock.Mutex, error) {
return svc.Acquire(b.Build(), opt...)
func (b *LockRequestBuilder[T]) End() T {
return b.T
}

+ 9
- 9
client/internal/publock/reqbuilder/package.go View File

@@ -8,15 +8,15 @@ import (
jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"
)

type PackageLockReqBuilder struct {
*LockRequestBuilder
type PackageLockReqBuilder[T any] struct {
*LockRequestBuilder[T]
}

func (b *LockRequestBuilder) Package() *PackageLockReqBuilder {
return &PackageLockReqBuilder{LockRequestBuilder: b}
func (b *LockRequestBuilder[T]) Package() *PackageLockReqBuilder[T] {
return &PackageLockReqBuilder[T]{LockRequestBuilder: b}
}
func (b *PackageLockReqBuilder) Buzy(pkgID jcstypes.PackageID) *PackageLockReqBuilder {
b.locks = append(b.locks, types.Lock{
func (b *PackageLockReqBuilder[T]) Buzy(pkgID jcstypes.PackageID) *PackageLockReqBuilder[T] {
b.Locks = append(b.Locks, types.Lock{
Path: b.makePath(pkgID),
Name: lockprovider.PackageBuzyLock,
Target: lockprovider.NewEmptyTarget(),
@@ -24,8 +24,8 @@ func (b *PackageLockReqBuilder) Buzy(pkgID jcstypes.PackageID) *PackageLockReqBu
return b
}

func (b *PackageLockReqBuilder) Pin(pkgID jcstypes.PackageID) *PackageLockReqBuilder {
b.locks = append(b.locks, types.Lock{
func (b *PackageLockReqBuilder[T]) Pin(pkgID jcstypes.PackageID) *PackageLockReqBuilder[T] {
b.Locks = append(b.Locks, types.Lock{
Path: b.makePath(pkgID),
Name: lockprovider.PackagePinLock,
Target: lockprovider.NewEmptyTarget(),
@@ -33,6 +33,6 @@ func (b *PackageLockReqBuilder) Pin(pkgID jcstypes.PackageID) *PackageLockReqBui
return b
}

func (b *PackageLockReqBuilder) makePath(pkgID jcstypes.PackageID) []string {
func (b *PackageLockReqBuilder[T]) makePath(pkgID jcstypes.PackageID) []string {
return []string{lockprovider.PackageLockPathPrefix, strconv.FormatInt(int64(pkgID), 10)}
}

+ 9
- 9
client/internal/publock/reqbuilder/user_space.go View File

@@ -8,15 +8,15 @@ import (
jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"
)

type UserSpaceLockReqBuilder struct {
*LockRequestBuilder
type UserSpaceLockReqBuilder[T any] struct {
*LockRequestBuilder[T]
}

func (b *LockRequestBuilder) UserSpace() *UserSpaceLockReqBuilder {
return &UserSpaceLockReqBuilder{LockRequestBuilder: b}
func (b *LockRequestBuilder[T]) UserSpace() *UserSpaceLockReqBuilder[T] {
return &UserSpaceLockReqBuilder[T]{LockRequestBuilder: b}
}
func (b *UserSpaceLockReqBuilder) Buzy(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder {
b.locks = append(b.locks, types.Lock{
func (b *UserSpaceLockReqBuilder[T]) Buzy(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder[T] {
b.Locks = append(b.Locks, types.Lock{
Path: b.makePath(spaceID),
Name: lockprovider.UserSpaceBuzyLock,
Target: lockprovider.NewEmptyTarget(),
@@ -24,8 +24,8 @@ func (b *UserSpaceLockReqBuilder) Buzy(spaceID jcstypes.UserSpaceID) *UserSpaceL
return b
}

func (b *UserSpaceLockReqBuilder) GC(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder {
b.locks = append(b.locks, types.Lock{
func (b *UserSpaceLockReqBuilder[T]) GC(spaceID jcstypes.UserSpaceID) *UserSpaceLockReqBuilder[T] {
b.Locks = append(b.Locks, types.Lock{
Path: b.makePath(spaceID),
Name: lockprovider.UserSpaceGCLock,
Target: lockprovider.NewEmptyTarget(),
@@ -33,6 +33,6 @@ func (b *UserSpaceLockReqBuilder) GC(spaceID jcstypes.UserSpaceID) *UserSpaceLoc
return b
}

func (b *UserSpaceLockReqBuilder) makePath(hubID jcstypes.UserSpaceID) []string {
func (b *UserSpaceLockReqBuilder[T]) makePath(hubID jcstypes.UserSpaceID) []string {
return []string{lockprovider.UserSpaceLockPathPrefix, strconv.FormatInt(int64(hubID), 10)}
}

+ 165
- 122
client/internal/publock/service.go View File

@@ -7,14 +7,14 @@ import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/trie"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/lockprovider"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client"
)

type AcquireOption struct {
Timeout time.Duration
Reason string
}

type AcquireOptionFn func(opt *AcquireOption)
@@ -25,171 +25,214 @@ func WithTimeout(timeout time.Duration) AcquireOptionFn {
}
}

type Service struct {
lock *sync.Mutex
provdersTrie *trie.Trie[types.LockProvider]
acquirings []*acquireInfo
nextReqID int64
func WithReason(reason string) AcquireOptionFn {
return func(opt *AcquireOption) {
opt.Reason = reason
}
}

func NewService() *Service {
svc := &Service{
lock: &sync.Mutex{},
provdersTrie: trie.NewTrie[types.LockProvider](),
}
type PubLock struct {
core *Core
cliCli *clirpc.Client
pubChan clirpc.PubLockMessageChan
lock sync.Mutex
acquirings map[string]*acquireInfo
releasing map[string]*releaseInfo
nextCtxID int64
}

svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock()
svc.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock()
return svc
func NewMaster() *PubLock {
core := NewCore()
return &PubLock{
core: core,
}
}

type acquireInfo struct {
Request types.LockRequest
Callback *future.SetValueFuture[types.RequestID]
LastErr error
func NewSlave(cli *clirpc.Client) *PubLock {
return &PubLock{
cliCli: cli,
acquirings: make(map[string]*acquireInfo),
releasing: make(map[string]*releaseInfo),
}
}

func (svc *Service) Acquire(req types.LockRequest, opts ...AcquireOptionFn) (*Mutex, error) {
var opt = AcquireOption{
Timeout: time.Second * 10,
func (s *PubLock) BeginReentrant() *Reentrant {
r := &Reentrant{
p: s,
}
for _, fn := range opts {
fn(&opt)
r.T = r
return r
}

func (p *PubLock) BeginMutex() *MutexBuilder {
m := &MutexBuilder{
pub: p,
}
m.T = m
return m
}

ctx := context.Background()
if opt.Timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, opt.Timeout)
defer cancel()
func (p *PubLock) Start() {
if p.core != nil {
p.core.Start()
return
}
}

// 就地检测锁是否可用
svc.lock.Lock()
defer svc.lock.Unlock()
func (p *PubLock) Stop() {
p.lock.Lock()
defer p.lock.Unlock()

reqID, err := svc.tryAcquireOne(req)
if err != nil {
return nil, err
if p.core != nil {
p.core.Stop()
return
}

if reqID != "" {
return &Mutex{
svc: svc,
lockReq: req,
lockReqID: reqID,
}, nil
if p.pubChan != nil {
p.pubChan.Close()
p.pubChan = nil
}

// 就地检测失败,那么就需要异步等待锁可用
info := &acquireInfo{
Request: req,
Callback: future.NewSetValue[types.RequestID](),
}
svc.acquirings = append(svc.acquirings, info)
p.cliCli.Release()
p.cliCli = nil
}

// 等待的时候不加锁
svc.lock.Unlock()
reqID, err = info.Callback.Wait(ctx)
svc.lock.Lock()
func (p *PubLock) acquire(req types.LockRequest, opts ...AcquireOptionFn) (LockedRequest, error) {
p.lock.Lock()

if err == nil {
return &Mutex{
svc: svc,
lockReq: req,
lockReqID: reqID,
}, nil
if p.core != nil {
p.lock.Unlock()
return p.core.Acquire(req, opts...)
}

if err != future.ErrCanceled {
lo2.Remove(svc.acquirings, info)
return nil, err
if p.pubChan == nil {
p.pubChan = p.cliCli.PubLockChannel(context.Background())
go p.receivingChan()
}

// 如果第一次等待是超时错误,那么在锁里再尝试获取一次结果
reqID, err = info.Callback.TryGetValue()
if err == nil {
return &Mutex{
svc: svc,
lockReq: req,
lockReqID: reqID,
}, nil
acqID := fmt.Sprintf("%v", p.nextCtxID)
p.nextCtxID++

cerr := p.pubChan.Send(&types.AcquireMsg{ContextID: acqID, Request: req})
if cerr != nil {
p.lock.Unlock()
return LockedRequest{}, cerr.ToError()
}

lo2.Remove(svc.acquirings, info)
return nil, err
}
callback := future.NewSetValue[types.RequestID]()
info := &acquireInfo{
Request: req,
Callback: callback,
}
p.acquirings[acqID] = info
p.lock.Unlock()

func (s *Service) BeginReentrant() *Reentrant {
return &Reentrant{
svc: s,
reqID, err := callback.Wait(context.Background())
if err != nil {
return LockedRequest{}, err
}

return LockedRequest{
Req: req,
ReqID: reqID,
}, nil
}

func (s *Service) release(reqID types.RequestID, req types.LockRequest) {
s.lock.Lock()
defer s.lock.Unlock()
func (p *PubLock) release(reqID types.RequestID) {
log := logger.WithField("Mod", "PubLock")

s.releaseRequest(reqID, req)
s.tryAcquirings()
}
p.lock.Lock()

func (a *Service) tryAcquirings() {
for i := 0; i < len(a.acquirings); i++ {
req := a.acquirings[i]
if p.core != nil {
p.lock.Unlock()
p.core.release(reqID)
return
}

reqID, err := a.tryAcquireOne(req.Request)
if err != nil {
req.LastErr = err
continue
}
if p.pubChan == nil {
p.pubChan = p.cliCli.PubLockChannel(context.Background())
go p.receivingChan()
}

req.Callback.SetValue(reqID)
a.acquirings[i] = nil
relID := fmt.Sprintf("%v", p.nextCtxID)
p.nextCtxID++

cerr := p.pubChan.Send(&types.ReleaseMsg{ContextID: relID, RequestID: reqID})
if cerr != nil {
p.lock.Unlock()
log.Warnf("unlock %v: %v", reqID, cerr.ToError())
return
}

a.acquirings = lo2.RemoveAllDefault(a.acquirings)
}
callback := future.NewSetVoid()
info := &releaseInfo{
RequestID: reqID,
Callback: callback,
}
p.releasing[relID] = info
p.lock.Unlock()

func (s *Service) tryAcquireOne(req types.LockRequest) (types.RequestID, error) {
err := s.testOneRequest(req)
err := callback.Wait(context.Background())
if err != nil {
return "", err
log.Warnf("unlock %v: %v", reqID, err)
return
}

reqID := types.RequestID(fmt.Sprintf("%d", s.nextReqID))
s.nextReqID++

s.applyRequest(reqID, req)
return reqID, nil
log.Tracef("unlock %v", reqID)
}

func (s *Service) testOneRequest(req types.LockRequest) error {
for _, lock := range req.Locks {
n, ok := s.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lock.Path)
func (p *PubLock) receivingChan() {
log := logger.WithField("Mod", "PubLock")
for {
msg, cerr := p.pubChan.Receive()
if cerr != nil {
p.lock.Lock()
for _, info := range p.acquirings {
info.Callback.SetError(cerr.ToError())
}
p.acquirings = make(map[string]*acquireInfo)

for _, info := range p.releasing {
info.Callback.SetError(cerr.ToError())
}
p.releasing = make(map[string]*releaseInfo)
p.lock.Unlock()

log.Warnf("receive channel error: %v", cerr.ToError())
return
}

err := n.Value.CanLock(lock)
if err != nil {
return err
p.lock.Lock()

switch msg := msg.(type) {
case *types.AcquireResultMsg:
info, ok := p.acquirings[msg.ContextID]
if !ok {
continue
}

if msg.Success {
info.Callback.SetValue(msg.RequestID)
} else {
info.Callback.SetError(fmt.Errorf(msg.Reason))
}
delete(p.acquirings, msg.ContextID)

case *types.ReleaseResultMsg:
info, ok := p.releasing[msg.ContextID]
if !ok {
continue
}

info.Callback.SetVoid()
delete(p.releasing, msg.ContextID)
}
}

return nil
}

func (s *Service) applyRequest(reqID types.RequestID, req types.LockRequest) {
for _, lock := range req.Locks {
p, _ := s.provdersTrie.WalkEnd(lock.Path)
p.Value.Lock(reqID, lock)
p.lock.Unlock()
}
}

func (s *Service) releaseRequest(reqID types.RequestID, req types.LockRequest) {
for _, lock := range req.Locks {
p, _ := s.provdersTrie.WalkEnd(lock.Path)
p.Value.Unlock(reqID, lock)
}
type releaseInfo struct {
RequestID types.RequestID
Callback *future.SetVoidFuture
}

+ 30
- 0
client/internal/publock/types/channel.go View File

@@ -0,0 +1,30 @@
package types

type AcquireMsg struct {
ContextID string
Request LockRequest
}

func (*AcquireMsg) IsPubLockMessage() bool { return true }

type AcquireResultMsg struct {
ContextID string
Success bool
Reason string
RequestID RequestID
}

func (*AcquireResultMsg) IsPubLockMessage() bool { return true }

type ReleaseMsg struct {
ContextID string
RequestID RequestID
}

func (*ReleaseMsg) IsPubLockMessage() bool { return true }

type ReleaseResultMsg struct {
ContextID string
}

func (*ReleaseResultMsg) IsPubLockMessage() bool { return true }

+ 2
- 3
client/internal/services/object.go View File

@@ -11,7 +11,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
"gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
@@ -682,7 +681,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID jcstypes.ObjectID, in
objBlkMap[blk.Index] = blk
}

lockBld := reqbuilder.NewBuilder()
lockBld := svc.PubLock.BeginMutex()

var compBlks []jcstypes.ObjectBlock
var compBlkSpaces []jcstypes.UserSpaceDetail
@@ -706,7 +705,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID jcstypes.ObjectID, in
lockBld.UserSpace().Buzy(stg.UserSpace.UserSpaceID)
}

mutex, err := lockBld.MutexLock(svc.PubLock)
mutex, err := lockBld.Lock()
if err != nil {
return jcstypes.Object{}, fmt.Errorf("acquire lock: %w", err)
}


+ 2
- 2
client/internal/services/service.go View File

@@ -19,7 +19,7 @@ import (

// Service 结构体封装了分布锁服务和任务管理服务。
type Service struct {
PubLock *publock.Service
PubLock *publock.PubLock
Downloader *downloader.Downloader
AccessStat *accessstat.AccessStat
Uploader *uploader.Uploader
@@ -36,7 +36,7 @@ type Service struct {
}

func NewService(
publock *publock.Service,
publock *publock.PubLock,
downloader *downloader.Downloader,
accStat *accessstat.AccessStat,
uploder *uploader.Uploader,


+ 3
- 4
client/internal/services/user_space.go View File

@@ -13,7 +13,6 @@ import (

"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
@@ -200,9 +199,9 @@ func (svc *UserSpaceService) DownloadPackage(packageID jcstypes.PackageID, users
return err
}

mutex, err := reqbuilder.NewBuilder().
UserSpace().Buzy(userspaceID).
MutexLock(svc.PubLock)
mutex, err := svc.PubLock.BeginMutex().
UserSpace().Buzy(userspaceID).End().
Lock()
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}


+ 3
- 5
client/internal/ticktock/change_redundancy.go View File

@@ -8,7 +8,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"
"gitlink.org.cn/cloudream/jcs-pub/common/types/datamap"
)
@@ -79,7 +78,7 @@ loop:
break loop
}

lock, err := reqbuilder.NewBuilder().Package().Buzy(id).MutexLock(t.pubLock, publock.WithTimeout(time.Second*10))
lock, err := t.pubLock.BeginMutex().Package().Buzy(id).End().Lock(publock.WithTimeout(time.Second * 10))
if err != nil {
log.Warnf("lock package: %v", err)
continue
@@ -169,11 +168,10 @@ func (j *ChangeRedundancy) changeOne(ctx *changeRedundancyContext, pkg jcstypes.
continue
}

reqBlder := reqbuilder.NewBuilder()
for _, space := range selectedSpaces {
reqBlder.UserSpace().Buzy(space.UserSpace.UserSpace.UserSpaceID)
reen.UserSpace().Buzy(space.UserSpace.UserSpace.UserSpaceID)
}
err := reen.Lock(reqBlder.Build())
err := reen.Lock()
if err != nil {
log.WithField("ObjectID", obj.Object.ObjectID).Warnf("acquire lock: %s", err.Error())
continue


+ 2
- 4
client/internal/ticktock/redundancy_shrink.go View File

@@ -15,7 +15,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
"gitlink.org.cn/cloudream/jcs-pub/common/consts"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
@@ -918,11 +917,10 @@ func (t *ChangeRedundancy) generateSysEventForECObject(solu annealingSolution, o
}

func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[jcstypes.UserSpaceID]bool, reen *publock.Reentrant) (exec.PlanResult, error) {
reqBlder := reqbuilder.NewBuilder()
for id, _ := range planningSpaceIDs {
reqBlder.UserSpace().Buzy(id)
reen.UserSpace().Buzy(id)
}
err := reen.Lock(reqBlder.Build())
err := reen.Lock()
if err != nil {
return exec.PlanResult{}, fmt.Errorf("locking shard resources: %w", err)
}


+ 2
- 2
client/internal/ticktock/ticktock.go View File

@@ -32,11 +32,11 @@ type TickTock struct {
spaceMeta *metacache.UserSpaceMeta
stgPool *pool.Pool
evtPub *sysevent.Publisher
pubLock *publock.Service
pubLock *publock.PubLock
speedStats *speedstats.SpeedStats
}

func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.Service, speedStats *speedstats.SpeedStats) *TickTock {
func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.PubLock, speedStats *speedstats.SpeedStats) *TickTock {
sch, _ := gocron.NewScheduler()
t := &TickTock{
cfg: cfg,


+ 1
- 3
client/internal/ticktock/user_space_gc.go View File

@@ -8,8 +8,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"

"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
)

type UserSpaceGC struct {
@@ -47,7 +45,7 @@ func (j *UserSpaceGC) Execute(t *TickTock) {
func (j *UserSpaceGC) gcOne(t *TickTock, space *jcstypes.UserSpaceDetail) {
log := logger.WithType[UserSpaceGC]("Event")

mutex, err := reqbuilder.NewBuilder().UserSpace().GC(space.UserSpace.UserSpaceID).MutexLock(t.pubLock)
mutex, err := t.pubLock.BeginMutex().UserSpace().GC(space.UserSpace.UserSpaceID).End().Lock()
if err != nil {
log.Warnf("acquire lock: %v", err)
return


+ 6
- 7
client/internal/uploader/uploader.go View File

@@ -14,7 +14,6 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec"
@@ -26,14 +25,14 @@ import (
)

type Uploader struct {
pubLock *publock.Service
pubLock *publock.PubLock
connectivity *connectivity.Collector
stgPool *pool.Pool
spaceMeta *metacache.UserSpaceMeta
db *db.DB
}

func NewUploader(pubLock *publock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader {
func NewUploader(pubLock *publock.PubLock, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader {
return &Uploader{
pubLock: pubLock,
connectivity: connectivity,
@@ -99,7 +98,7 @@ func (u *Uploader) BeginUpdate(pkgID jcstypes.PackageID, affinity jcstypes.UserS
target := u.chooseUploadStorage(uploadSpaces, affinity)

// 防止上传的副本被清除
pubLock, err := reqbuilder.NewBuilder().UserSpace().Buzy(target.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock)
pubLock, err := u.pubLock.BeginMutex().UserSpace().Buzy(target.Space.UserSpace.UserSpaceID).End().Lock()
if err != nil {
return nil, fmt.Errorf("acquire lock: %w", err)
}
@@ -160,11 +159,11 @@ func (u *Uploader) BeginCreateUpload(bktID jcstypes.BucketID, pkgName string, co
return nil, fmt.Errorf("create package: %w", err)
}

reqBld := reqbuilder.NewBuilder()
reqBld := u.pubLock.BeginMutex()
for _, stg := range spacesStgs {
reqBld.UserSpace().Buzy(stg.UserSpace.UserSpaceID)
}
lock, err := reqBld.MutexLock(u.pubLock)
lock, err := reqBld.Lock()
if err != nil {
return nil, fmt.Errorf("acquire lock: %w", err)
}
@@ -244,7 +243,7 @@ func (u *Uploader) UploadPart(objID jcstypes.ObjectID, index int, stream io.Read
space = u.chooseUploadStorage(userStgs, 0).Space
}

lock, err := reqbuilder.NewBuilder().UserSpace().Buzy(space.UserSpace.UserSpaceID).MutexLock(u.pubLock)
lock, err := u.pubLock.BeginMutex().UserSpace().Buzy(space.UserSpace.UserSpaceID).End().Lock()
if err != nil {
return fmt.Errorf("acquire lock: %w", err)
}


+ 1
- 2
client/internal/uploader/user_space_upload.go View File

@@ -9,7 +9,6 @@ import (

"github.com/samber/lo"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
@@ -105,7 +104,7 @@ func (u *Uploader) UserSpaceUpload(userSpaceID jcstypes.UserSpaceID, rootPath jc
return nil, fmt.Errorf("getting base store: %w", err)
}

mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock)
mutex, err := u.pubLock.BeginMutex().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).End().Lock()
if err != nil {
delPkg()
return nil, fmt.Errorf("acquire lock: %w", err)


+ 82
- 16
common/pkgs/rpc/channel.go View File

@@ -1,6 +1,8 @@
package rpc

import (
"sync"

"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
)
@@ -50,6 +52,7 @@ type bidChanClient[Recv, Send any] struct {
cli BidChannelAPIClient
cancelFn func()
lastErr *CodeError
lock sync.Mutex
}

func NewBidChanClient[Recv, Send any](cli BidChannelAPIClient, cancelFn func()) BidChan[Recv, Send] {
@@ -57,8 +60,12 @@ func NewBidChanClient[Recv, Send any](cli BidChannelAPIClient, cancelFn func())
}

func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError {
c.lock.Lock()
defer c.lock.Unlock()

if c.lastErr != nil {
return c.lastErr
err := c.lastErr
return err
}

data, err := serder.ObjectToJSONEx(val)
@@ -67,11 +74,17 @@ func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError {
c.lastErr = Failed(ecode.OperationFailed, err.Error())
return Failed(ecode.OperationFailed, err.Error())
}
c.lock.Unlock()

err = c.cli.Send(&Request{Payload: data})

c.lock.Lock()

if err != nil {
c.cancelFn()
c.lastErr = getCodeError(err)
if c.lastErr == nil {
c.lastErr = getCodeError(err)
}
return c.lastErr
}

@@ -79,31 +92,50 @@ func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError {
}

func (c *bidChanClient[Recv, Send]) Receive() (Recv, *CodeError) {
c.lock.Lock()
defer c.lock.Unlock()

if c.lastErr != nil {
var def Recv
return def, c.lastErr
}

c.lock.Unlock()
resp, err := c.cli.Recv()
c.lock.Lock()

if err != nil {
c.cancelFn()
c.lastErr = getCodeError(err)

cerr := getCodeError(err)
if c.lastErr == nil {
c.lastErr = cerr
}

var def Recv
return def, c.lastErr
return def, cerr
}

resp2, err := serder.JSONToObjectEx[Recv](resp.Payload)
if err != nil {
c.cancelFn()
c.lastErr = Failed(ecode.OperationFailed, err.Error())

cerr := Failed(ecode.OperationFailed, err.Error())
if c.lastErr == nil {
c.lastErr = cerr
}

var def Recv
return def, c.lastErr
return def, cerr
}

return resp2, nil
}

func (c *bidChanClient[Recv, Send]) Close() {
c.lock.Lock()
defer c.lock.Unlock()

if c.lastErr != nil {
return
}
@@ -113,6 +145,9 @@ func (c *bidChanClient[Recv, Send]) Close() {
}

func (c *bidChanClient[Recv, Send]) CloseWithError(err *CodeError) {
c.lock.Lock()
defer c.lock.Unlock()

if c.lastErr != nil {
return
}
@@ -125,6 +160,7 @@ type bidChanServer[Recv, Send any] struct {
svr BidChannelAPIServer
errChan chan *CodeError
lastErr *CodeError
lock sync.Mutex
}

func NewBidChanServer[Recv, Send any](svr BidChannelAPIServer, errChan chan *CodeError) BidChan[Recv, Send] {
@@ -132,6 +168,9 @@ func NewBidChanServer[Recv, Send any](svr BidChannelAPIServer, errChan chan *Cod
}

func (s *bidChanServer[Recv, Send]) Send(val Send) *CodeError {
s.lock.Lock()
defer s.lock.Unlock()

if s.lastErr != nil {
return s.lastErr
}
@@ -140,43 +179,67 @@ func (s *bidChanServer[Recv, Send]) Send(val Send) *CodeError {
if err != nil {
s.lastErr = Failed(ecode.OperationFailed, err.Error())
s.errChan <- s.lastErr
return Failed(ecode.OperationFailed, err.Error())
return s.lastErr
}

s.lock.Unlock()
err = s.svr.Send(&Response{Payload: data})
s.lock.Lock()

if err != nil {
s.lastErr = getCodeError(err)
s.errChan <- s.lastErr
return s.lastErr
cerr := getCodeError(err)
if s.lastErr == nil {
s.lastErr = cerr
s.errChan <- cerr
}
return cerr
}

return nil
}

func (s *bidChanServer[Recv, Send]) Receive() (Recv, *CodeError) {
s.lock.Lock()
defer s.lock.Unlock()

if s.lastErr != nil {
var def Recv
return def, s.lastErr
}

s.lock.Unlock()
req, err := s.svr.Recv()
s.lock.Lock()

if err != nil {
s.lastErr = getCodeError(err)
s.errChan <- s.lastErr
cerr := getCodeError(err)
if s.lastErr == nil {
s.lastErr = cerr
s.errChan <- cerr
}

var def Recv
return def, s.lastErr
return def, cerr
}

req2, err := serder.JSONToObjectEx[Recv](req.Payload)
if err != nil {
s.lastErr = Failed(ecode.OperationFailed, err.Error())
s.errChan <- s.lastErr
cerr := Failed(ecode.OperationFailed, err.Error())
if s.lastErr == nil {
s.lastErr = cerr
s.errChan <- cerr
}

var def Recv
return def, s.lastErr
return def, cerr
}

return req2, nil
}
func (s *bidChanServer[Recv, Send]) Close() {
s.lock.Lock()
defer s.lock.Unlock()

if s.lastErr != nil {
return
}
@@ -186,6 +249,9 @@ func (s *bidChanServer[Recv, Send]) Close() {
}

func (s *bidChanServer[Recv, Send]) CloseWithError(err *CodeError) {
s.lock.Lock()
defer s.lock.Unlock()

if s.lastErr != nil {
return
}


Loading…
Cancel
Save