Browse Source

增加缓存移动功能

gitlink
Sydonian 2 years ago
parent
commit
dca2f02ab1
4 changed files with 159 additions and 10 deletions
  1. +47
    -10
      internal/services/mq/cache.go
  2. +102
    -0
      internal/task/cache_move_package.go
  3. +5
    -0
      internal/task/create_ec_package.go
  4. +5
    -0
      internal/task/create_rep_package.go

internal/services/mq/ipfs.go → internal/services/mq/cache.go View File

@@ -10,23 +10,24 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task"
mytask "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
logger.Warnf("new ipfs client: %s", err.Error())
return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "new ipfs client failed")
return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "new ipfs client failed")
}
defer ipfsCli.Close()

filesMap, err := ipfsCli.GetPinnedFiles()
if err != nil {
logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error())
return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed")
return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "get pinned files from ipfs failed")
}

// TODO 根据锁定清单过滤被锁定的文件的记录
@@ -37,7 +38,7 @@ func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.C
}
}

func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash]
@@ -61,7 +62,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she

} else if cache.State == consts.CacheStateTemp {
if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
}
}
}
@@ -69,10 +70,10 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she

// 增量情况下,不需要对filesMap中没检查的记录进行处理

return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries))
return mq.ReplyOK(agtmq.NewCheckCacheResp(entries))
}

func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash]
@@ -96,7 +97,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel

} else if cache.State == consts.CacheStateTemp {
if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
}
}
}
@@ -109,8 +110,44 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel
if err != nil {
logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error())
}
entries = append(entries, agtmq.NewCheckIPFSRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP))
entries = append(entries, agtmq.NewCheckCacheRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP))
}

return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries))
return mq.ReplyOK(agtmq.NewCheckCacheResp(entries))
}

func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID))
return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID()))
}

func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil {
return mq.ReplyFailed[agtmq.WaitCacheMovePackageResp](errorcode.TaskNotFound, "task not found")
}

if msg.WaitTimeoutMs == 0 {
tsk.Wait()

errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))

} else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {

errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, ""))
}
}

+ 102
- 0
internal/task/cache_move_package.go View File

@@ -0,0 +1,102 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type CacheMovePackage struct {
userID int64
packageID int64
}

func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage {
return &CacheMovePackage{
userID: userID,
packageID: packageID,
}
}

func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *CacheMovePackage) do(ctx TaskContext) error {
log := logger.WithType[CacheMovePackage]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

// TOOD EC的锁
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 读取Package信息和包含的Object信息
Package().ReadOne(t.packageID).Object().ReadAny().
// 读取Rep对象的配置
ObjectRep().ReadAny().
// 创建Cache记录
Cache().CreateAny().
IPFS().
// pin文件
CreateAnyRep(*globals.Local.NodeID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquiring distlock: %w", err)
}
defer mutex.Unlock()

coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
if err != nil {
return fmt.Errorf("getting package: %w", err)
}

if pkgResp.Redundancy.IsRepInfo() {
return t.moveRep(ctx, coorCli, pkgResp.Package)
} else {
// TODO EC的CacheMove逻辑
}

return nil
}
func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, pkg model.Package) error {
getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID))
if err != nil {
return fmt.Errorf("getting package object rep data: %w", err)
}

ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

var fileHashes []string
for _, rep := range getRepResp.Data {
if err := ipfsCli.Pin(rep.FileHash); err != nil {
return fmt.Errorf("pinning file %s: %w", rep.FileHash, err)
}

fileHashes = append(fileHashes, rep.FileHash)
}

_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes))
if err != nil {
return fmt.Errorf("reporting cache package moved: %w", err)
}

return nil
}

+ 5
- 0
internal/task/create_ec_package.go View File

@@ -4,6 +4,7 @@ import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
@@ -24,6 +25,10 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera
}

func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreateECPackage]("Task")
log.Debugf("begin")
defer log.Debugf("end")

ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{
UpdatePackageContext: &cmd.UpdatePackageContext{
Distlock: ctx.distlock,


+ 5
- 0
internal/task/create_rep_package.go View File

@@ -4,6 +4,7 @@ import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)
@@ -23,6 +24,10 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter
}

func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreateRepPackage]("Task")
log.Debugf("begin")
defer log.Debugf("end")

ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})


Loading…
Cancel
Save