diff --git a/internal/services/mq/ipfs.go b/internal/services/mq/cache.go similarity index 60% rename from internal/services/mq/ipfs.go rename to internal/services/mq/cache.go index fa101ae..ce76401 100644 --- a/internal/services/mq/ipfs.go +++ b/internal/services/mq/cache.go @@ -10,23 +10,24 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" + mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" ) -func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { ipfsCli, err := globals.IPFSPool.Acquire() if err != nil { logger.Warnf("new ipfs client: %s", err.Error()) - return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "new ipfs client failed") + return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "new ipfs client failed") } defer ipfsCli.Close() filesMap, err := ipfsCli.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") + return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "get pinned files from ipfs failed") } // TODO 根据锁定清单过滤被锁定的文件的记录 @@ -37,7 +38,7 @@ func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.C } } -func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -61,7 +62,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { - entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } } @@ -69,10 +70,10 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she // 增量情况下,不需要对filesMap中没检查的记录进行处理 - return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) + return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) } -func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -96,7 +97,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { - entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } } @@ -109,8 +110,44 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel if err != nil { logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) } - entries = append(entries, agtmq.NewCheckIPFSRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP)) } - return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) + return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) +} + +func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) { + tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID)) + return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID())) +} + +func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) { + tsk := svc.taskManager.FindByID(msg.TaskID) + if tsk == nil { + return mq.ReplyFailed[agtmq.WaitCacheMovePackageResp](errorcode.TaskNotFound, "task not found") + } + + if msg.WaitTimeoutMs == 0 { + tsk.Wait() + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + + } else { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) + } } diff --git a/internal/task/cache_move_package.go b/internal/task/cache_move_package.go new file mode 100644 index 0000000..26cc3a7 --- /dev/null +++ b/internal/task/cache_move_package.go @@ -0,0 +1,102 @@ +package task + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +type CacheMovePackage struct { + userID int64 + packageID int64 +} + +func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { + return &CacheMovePackage{ + userID: userID, + packageID: packageID, + } +} + +func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.do(ctx) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} + +func (t *CacheMovePackage) do(ctx TaskContext) error { + log := logger.WithType[CacheMovePackage]("Task") + log.Debugf("begin with %v", logger.FormatStruct(t)) + defer log.Debugf("end") + + // TOOD EC的锁 + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 读取Package信息和包含的Object信息 + Package().ReadOne(t.packageID).Object().ReadAny(). + // 读取Rep对象的配置 + ObjectRep().ReadAny(). + // 创建Cache记录 + Cache().CreateAny(). + IPFS(). + // pin文件 + CreateAnyRep(*globals.Local.NodeID). + MutexLock(ctx.distlock) + if err != nil { + return fmt.Errorf("acquiring distlock: %w", err) + } + defer mutex.Unlock() + + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) + if err != nil { + return fmt.Errorf("getting package: %w", err) + } + + if pkgResp.Redundancy.IsRepInfo() { + return t.moveRep(ctx, coorCli, pkgResp.Package) + } else { + // TODO EC的CacheMove逻辑 + } + + return nil +} +func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, pkg model.Package) error { + getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID)) + if err != nil { + return fmt.Errorf("getting package object rep data: %w", err) + } + + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + var fileHashes []string + for _, rep := range getRepResp.Data { + if err := ipfsCli.Pin(rep.FileHash); err != nil { + return fmt.Errorf("pinning file %s: %w", rep.FileHash, err) + } + + fileHashes = append(fileHashes, rep.FileHash) + } + + _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes)) + if err != nil { + return fmt.Errorf("reporting cache package moved: %w", err) + } + + return nil +} diff --git a/internal/task/create_ec_package.go b/internal/task/create_ec_package.go index fb69e8c..61fad01 100644 --- a/internal/task/create_ec_package.go +++ b/internal/task/create_ec_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" @@ -24,6 +25,10 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera } func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + log := logger.WithType[CreateECPackage]("Task") + log.Debugf("begin") + defer log.Debugf("end") + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ UpdatePackageContext: &cmd.UpdatePackageContext{ Distlock: ctx.distlock, diff --git a/internal/task/create_rep_package.go b/internal/task/create_rep_package.go index 7b15c64..1ab9401 100644 --- a/internal/task/create_rep_package.go +++ b/internal/task/create_rep_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" ) @@ -23,6 +24,10 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter } func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + log := logger.WithType[CreateRepPackage]("Task") + log.Debugf("begin") + defer log.Debugf("end") + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, })