diff --git a/internal/config/config.go b/internal/config/config.go index c4617f1..a04949b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,25 +2,25 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" log "gitlink.org.cn/cloudream/common/pkgs/logger" c "gitlink.org.cn/cloudream/common/utils/config" - "gitlink.org.cn/cloudream/common/utils/ipfs" + stgmodels "gitlink.org.cn/cloudream/storage-common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/grpc" stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Config struct { - ID int64 `json:"id"` - GRPCListenAddress string `json:"grpcListenAddress"` - GRPCPort int `json:"grpcPort"` - ECPacketSize int64 `json:"ecPacketSize"` - LocalIP string `json:"localIP"` - ExternalIP string `json:"externalIP"` - StorageBaseDir string `json:"storageBaseDir"` - TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 - Logger log.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS ipfs.Config `json:"ipfs"` - DistLock distlock.Config `json:"distlock"` + ID int64 `json:"id"` + Local stgmodels.LocalMachineInfo `json:"local"` + GRPC *grpc.Config `json:"grpc"` + ECPacketSize int64 `json:"ecPacketSize"` + StorageBaseDir string `json:"storageBaseDir"` + TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 + Logger log.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS ipfs.Config `json:"ipfs"` + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/internal/services/cmd/agent.go b/internal/services/cmd/agent.go deleted file mode 100644 index 6fbfc7f..0000000 --- a/internal/services/cmd/agent.go +++ /dev/null @@ -1,19 +0,0 @@ -package cmd - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage-common/consts" - agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" -) - -func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { - var ipfsState string - - if svc.ipfs.IsUp() { - ipfsState = consts.IPFSStateOK - } else { - ipfsState = consts.IPFSStateOK - } - - return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) -} diff --git a/internal/services/cmd/service.go b/internal/services/cmd/service.go deleted file mode 100644 index c9eecd1..0000000 --- a/internal/services/cmd/service.go +++ /dev/null @@ -1,21 +0,0 @@ -package cmd - -import ( - "gitlink.org.cn/cloudream/common/utils/ipfs" - "gitlink.org.cn/cloudream/storage-agent/internal/task" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" -) - -type Service struct { - ipfs *ipfs.IPFS - taskManager *task.Manager - coordinator *coormq.Client -} - -func NewService(ipfs *ipfs.IPFS, taskMgr *task.Manager, coordinator *coormq.Client) *Service { - return &Service{ - ipfs: ipfs, - taskManager: taskMgr, - coordinator: coordinator, - } -} diff --git a/internal/services/grpc/grpc_service.go b/internal/services/grpc/service.go similarity index 67% rename from internal/services/grpc/grpc_service.go rename to internal/services/grpc/service.go index 11210b6..57a76e4 100644 --- a/internal/services/grpc/grpc_service.go +++ b/internal/services/grpc/service.go @@ -6,25 +6,29 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/common/utils/ipfs" - agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" + "gitlink.org.cn/cloudream/storage-common/globals" + agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" ) -type GRPCService struct { - agentserver.FileTransportServer - ipfs *ipfs.IPFS +type Service struct { + agentserver.AgentServer } -func NewService(ipfs *ipfs.IPFS) *GRPCService { - return &GRPCService{ - ipfs: ipfs, - } +func NewService() *Service { + return &Service{} } -func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) error { +func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error { log.Debugf("client upload file") - writer, err := s.ipfs.CreateFile() + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + log.Warnf("new ipfs client: %s", err.Error()) + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + writer, err := ipfsCli.CreateFileStream() if err != nil { log.Warnf("create file failed, err: %s", err.Error()) return fmt.Errorf("create file failed, err: %w", err) @@ -45,18 +49,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) return fmt.Errorf("recv message failed, err: %w", err) } - if msg.Type == agentserver.FileDataPacketType_Data { - err = myio.WriteAll(writer, msg.Data) - if err != nil { - // 关闭文件写入,不需要返回的hash和error - writer.Abort(io.ErrClosedPipe) - log.Warnf("write data to file failed, err: %s", err.Error()) - return fmt.Errorf("write data to file failed, err: %w", err) - } + err = myio.WriteAll(writer, msg.Data) + if err != nil { + // 关闭文件写入,不需要返回的hash和error + writer.Abort(io.ErrClosedPipe) + log.Warnf("write data to file failed, err: %s", err.Error()) + return fmt.Errorf("write data to file failed, err: %w", err) + } - recvSize += int64(len(msg.Data)) + recvSize += int64(len(msg.Data)) - } else if msg.Type == agentserver.FileDataPacketType_EOF { + if msg.Type == agentserver.FileDataPacketType_EOF { // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash hash, err := writer.Finish() if err != nil { @@ -65,7 +68,7 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) } // 并将结果返回到客户端 - err = server.SendAndClose(&agentserver.SendResp{ + err = server.SendAndClose(&agentserver.SendIPFSFileResp{ FileHash: hash, }) if err != nil { @@ -78,10 +81,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) } } -func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTransport_GetFileServer) error { +func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserver.Agent_GetIPFSFileServer) error { log.WithField("FileHash", req.FileHash).Debugf("client download file") - reader, err := s.ipfs.OpenRead(req.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + log.Warnf("new ipfs client: %s", err.Error()) + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + reader, err := ipfsCli.OpenRead(req.FileHash) if err != nil { log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error()) return fmt.Errorf("open file to read failed, err: %w", err) diff --git a/internal/services/mq/agent.go b/internal/services/mq/agent.go new file mode 100644 index 0000000..5246465 --- /dev/null +++ b/internal/services/mq/agent.go @@ -0,0 +1,29 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" +) + +func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { + var ipfsState string + + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + logger.Warnf("new ipfs client: %s", err.Error()) + ipfsState = consts.IPFSStateUnavailable + + } else { + if ipfsCli.IsUp() { + ipfsState = consts.IPFSStateOK + } else { + ipfsState = consts.IPFSStateUnavailable + } + ipfsCli.Close() + } + + return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) +} diff --git a/internal/services/cmd/ipfs.go b/internal/services/mq/cache.go similarity index 50% rename from internal/services/cmd/ipfs.go rename to internal/services/mq/cache.go index 5d56fb9..ce76401 100644 --- a/internal/services/cmd/ipfs.go +++ b/internal/services/mq/cache.go @@ -1,34 +1,44 @@ -package cmd +package mq import ( "time" shell "github.com/ipfs/go-ipfs-api" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" + mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" ) -func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { - filesMap, err := svc.ipfs.GetPinnedFiles() +func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + logger.Warnf("new ipfs client: %s", err.Error()) + return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "new ipfs client failed") + } + defer ipfsCli.Close() + + filesMap, err := ipfsCli.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") + return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "get pinned files from ipfs failed") } // TODO 根据锁定清单过滤被锁定的文件的记录 if msg.IsComplete { - return svc.checkComplete(msg, filesMap) + return svc.checkComplete(msg, filesMap, ipfsCli) } else { - return svc.checkIncrement(msg, filesMap) + return svc.checkIncrement(msg, filesMap, ipfsCli) } } -func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -37,7 +47,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she // 不处理 } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") - err := svc.ipfs.Unpin(cache.FileHash) + err := ipfsCli.Unpin(cache.FileHash) if err != nil { logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } @@ -52,7 +62,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { - entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } } @@ -60,10 +70,10 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she // 增量情况下,不需要对filesMap中没检查的记录进行处理 - return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) + return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) } -func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -72,7 +82,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel // 不处理 } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") - err := svc.ipfs.Unpin(cache.FileHash) + err := ipfsCli.Unpin(cache.FileHash) if err != nil { logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } @@ -87,7 +97,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { - entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } } @@ -96,12 +106,48 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel // map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录 for hash := range filesMap { logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry") - err := svc.ipfs.Unpin(hash) + err := ipfsCli.Unpin(hash) if err != nil { logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) } - entries = append(entries, agtmq.NewCheckIPFSRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP)) + } + + return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) +} + +func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) { + tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID)) + return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID())) +} + +func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) { + tsk := svc.taskManager.FindByID(msg.TaskID) + if tsk == nil { + return mq.ReplyFailed[agtmq.WaitCacheMovePackageResp](errorcode.TaskNotFound, "task not found") } - return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) + if msg.WaitTimeoutMs == 0 { + tsk.Wait() + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + + } else { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) + } } diff --git a/internal/services/cmd/object.go b/internal/services/mq/object.go similarity index 99% rename from internal/services/cmd/object.go rename to internal/services/mq/object.go index c1972c4..499ffb5 100644 --- a/internal/services/cmd/object.go +++ b/internal/services/mq/object.go @@ -1,4 +1,4 @@ -package cmd +package mq import ( "time" diff --git a/internal/services/mq/service.go b/internal/services/mq/service.go new file mode 100644 index 0000000..52a9599 --- /dev/null +++ b/internal/services/mq/service.go @@ -0,0 +1,15 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/storage-agent/internal/task" +) + +type Service struct { + taskManager *task.Manager +} + +func NewService(taskMgr *task.Manager) *Service { + return &Service{ + taskManager: taskMgr, + } +} diff --git a/internal/services/cmd/storage.go b/internal/services/mq/storage.go similarity index 63% rename from internal/services/cmd/storage.go rename to internal/services/mq/storage.go index 19e4859..3aeeabb 100644 --- a/internal/services/cmd/storage.go +++ b/internal/services/mq/storage.go @@ -1,30 +1,36 @@ -package cmd +package mq import ( "io/fs" - "io/ioutil" "os" "path/filepath" "time" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" - "gitlink.org.cn/cloudream/storage-common/utils" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/mq" - stgcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - myos "gitlink.org.cn/cloudream/storage-common/utils/os" + "gitlink.org.cn/cloudream/storage-common/utils" ) -func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) { - getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) +func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("getting storage info: %s", err.Error()) @@ -32,7 +38,7 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageMovePackageDirName(msg.PackageID, msg.UserID)) + outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID)) if err = os.MkdirAll(outputDirPath, 0755); err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("creating output directory: %s", err.Error()) @@ -40,16 +46,16 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](stgcmd.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))) - return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID())) + tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) + return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) } -func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*agtmq.WaitStorageMovePackageResp, *mq.CodeMessage) { - logger.WithField("TaskID", msg.TaskID).Debugf("wait moving package") +func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*agtmq.WaitStorageLoadPackageResp, *mq.CodeMessage) { + logger.WithField("TaskID", msg.TaskID).Debugf("wait loading package") tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return mq.ReplyFailed[agtmq.WaitStorageMovePackageResp](errorcode.TaskNotFound, "task not found") + return mq.ReplyFailed[agtmq.WaitStorageLoadPackageResp](errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { @@ -60,7 +66,7 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -70,17 +76,17 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) } - return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(false, "")) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "")) } } func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) - infos, err := ioutil.ReadDir(dirFullPath) + infos, err := os.ReadDir(dirFullPath) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) return mq.ReplyOK(agtmq.NewStorageCheckResp( @@ -89,30 +95,30 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe )) } - fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() }) + dirInfos := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() }) if msg.IsComplete { - return svc.checkStorageComplete(msg, fileInfos) + return svc.checkStorageComplete(msg, dirInfos) } else { - return svc.checkStorageIncrement(msg, fileInfos) + return svc.checkStorageIncrement(msg, dirInfos) } } -func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) { - infosMap := make(map[string]fs.FileInfo) - for _, info := range fileInfos { +func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) { + infosMap := make(map[string]fs.DirEntry) + for _, info := range dirInfos { infosMap[info.Name()] = info } var entries []agtmq.StorageCheckRespEntry for _, obj := range msg.Packages { - fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID) - _, ok := infosMap[fileName] + dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) + _, ok := infosMap[dirName] if ok { // 不需要做处理 // 删除map中的记录,表示此记录已被检查过 - delete(infosMap, fileName) + delete(infosMap, dirName) } else { // 只要文件不存在,就删除StoragePackage表中的记录 @@ -125,22 +131,22 @@ func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []f return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) } -func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) { +func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) { - infosMap := make(map[string]fs.FileInfo) - for _, info := range fileInfos { + infosMap := make(map[string]fs.DirEntry) + for _, info := range dirInfos { infosMap[info.Name()] = info } var entries []agtmq.StorageCheckRespEntry for _, obj := range msg.Packages { - fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID) - _, ok := infosMap[fileName] + dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) + _, ok := infosMap[dirName] if ok { // 不需要做处理 // 删除map中的记录,表示此记录已被检查过 - delete(infosMap, fileName) + delete(infosMap, dirName) } else { // 只要文件不存在,就删除StoragePackage表中的记录 @@ -152,7 +158,15 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs } func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { - getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("getting storage info: %s", err.Error()) @@ -160,7 +174,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - fullPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path) + fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)) var uploadFilePathes []string err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { @@ -180,7 +194,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") } - objIter := myos.NewUploadingObjectIterator(fullPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) if msg.Redundancy.Type == models.RedundancyRep { repInfo, err := msg.Redundancy.ToRepInfo() @@ -190,14 +204,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext]( - stgcmd.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, stgcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: &config.Cfg().ID, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -208,14 +215,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext]( - stgcmd.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, config.Cfg().ECPacketSize, stgcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: &config.Cfg().ID, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -231,17 +231,16 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) } - wrapTask := tsk.Body().(*stgcmd.TaskWrapper[mytask.TaskContext]) - if tsk.Error() != nil { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0)) } - if repTask, ok := wrapTask.InnerTask().(*stgcmd.CreateRepPackage); ok { + // TODO 避免判断类型 + if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID)) } - if ecTask, ok := wrapTask.InnerTask().(*stgcmd.CreateECPackage); ok { + if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID)) } diff --git a/internal/task/cache_move_package.go b/internal/task/cache_move_package.go new file mode 100644 index 0000000..26cc3a7 --- /dev/null +++ b/internal/task/cache_move_package.go @@ -0,0 +1,102 @@ +package task + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +type CacheMovePackage struct { + userID int64 + packageID int64 +} + +func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { + return &CacheMovePackage{ + userID: userID, + packageID: packageID, + } +} + +func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.do(ctx) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} + +func (t *CacheMovePackage) do(ctx TaskContext) error { + log := logger.WithType[CacheMovePackage]("Task") + log.Debugf("begin with %v", logger.FormatStruct(t)) + defer log.Debugf("end") + + // TOOD EC的锁 + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 读取Package信息和包含的Object信息 + Package().ReadOne(t.packageID).Object().ReadAny(). + // 读取Rep对象的配置 + ObjectRep().ReadAny(). + // 创建Cache记录 + Cache().CreateAny(). + IPFS(). + // pin文件 + CreateAnyRep(*globals.Local.NodeID). + MutexLock(ctx.distlock) + if err != nil { + return fmt.Errorf("acquiring distlock: %w", err) + } + defer mutex.Unlock() + + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) + if err != nil { + return fmt.Errorf("getting package: %w", err) + } + + if pkgResp.Redundancy.IsRepInfo() { + return t.moveRep(ctx, coorCli, pkgResp.Package) + } else { + // TODO EC的CacheMove逻辑 + } + + return nil +} +func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, pkg model.Package) error { + getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID)) + if err != nil { + return fmt.Errorf("getting package object rep data: %w", err) + } + + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + var fileHashes []string + for _, rep := range getRepResp.Data { + if err := ipfsCli.Pin(rep.FileHash); err != nil { + return fmt.Errorf("pinning file %s: %w", rep.FileHash, err) + } + + fileHashes = append(fileHashes, rep.FileHash) + } + + _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes)) + if err != nil { + return fmt.Errorf("reporting cache package moved: %w", err) + } + + return nil +} diff --git a/internal/task/create_ec_package.go b/internal/task/create_ec_package.go new file mode 100644 index 0000000..61fad01 --- /dev/null +++ b/internal/task/create_ec_package.go @@ -0,0 +1,43 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-agent/internal/config" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateECPackageResult = cmd.CreateECPackageResult + +type CreateECPackage struct { + cmd cmd.CreateECPackage + + Result *CreateECPackageResult +} + +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { + return &CreateECPackage{ + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + log := logger.WithType[CreateECPackage]("Task") + log.Debugf("begin") + defer log.Debugf("end") + + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ + UpdatePackageContext: &cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }, + ECPacketSize: config.Cfg().ECPacketSize, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/create_rep_package.go b/internal/task/create_rep_package.go new file mode 100644 index 0000000..1ab9401 --- /dev/null +++ b/internal/task/create_rep_package.go @@ -0,0 +1,39 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateRepPackageResult = cmd.CreateRepPackageResult + +type CreateRepPackage struct { + cmd cmd.CreateRepPackage + + Result *CreateRepPackageResult +} + +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { + return &CreateRepPackage{ + cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + log := logger.WithType[CreateRepPackage]("Task") + log.Debugf("begin") + defer log.Debugf("end") + + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/download_package.go b/internal/task/download_package.go new file mode 100644 index 0000000..c3c39ab --- /dev/null +++ b/internal/task/download_package.go @@ -0,0 +1,26 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" +) + +type DownloadPackage struct { + cmd *cmd.DownloadPackage +} + +func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { + return &DownloadPackage{ + cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), + } +} +func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.cmd.Execute(&cmd.DownloadPackageContext{ + Distlock: ctx.distlock, + }) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/ipfs_pin.go b/internal/task/ipfs_pin.go index 1a7fe7a..0489d5c 100644 --- a/internal/task/ipfs_pin.go +++ b/internal/task/ipfs_pin.go @@ -5,6 +5,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" ) type IPFSPin struct { @@ -31,7 +32,19 @@ func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) { log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") - err := ctx.ipfs.Pin(t.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + err := fmt.Errorf("new ipfs client: %w", err) + log.Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer ipfsCli.Close() + + err = ipfsCli.Pin(t.FileHash) if err != nil { err := fmt.Errorf("pin file failed, err: %w", err) log.WithField("FileHash", t.FileHash).Warn(err.Error()) diff --git a/internal/task/ipfs_read.go b/internal/task/ipfs_read.go index 91fb877..7ddfa6f 100644 --- a/internal/task/ipfs_read.go +++ b/internal/task/ipfs_read.go @@ -8,6 +8,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" ) type IPFSRead struct { @@ -61,7 +62,19 @@ func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) { } defer outputFile.Close() - rd, err := ctx.ipfs.OpenRead(t.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + err := fmt.Errorf("new ipfs client: %w", err) + log.Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer ipfsCli.Close() + + rd, err := ipfsCli.OpenRead(t.FileHash) if err != nil { err := fmt.Errorf("read ipfs file failed, err: %w", err) log.WithField("FileHash", t.FileHash).Warn(err.Error()) diff --git a/internal/task/task.go b/internal/task/task.go index 6bbd84f..3653981 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -3,14 +3,10 @@ package task import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/common/utils/ipfs" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type TaskContext struct { - ipfs *ipfs.IPFS - coordinator *coormq.Client - distlock *distsvc.Service + distlock *distsvc.Service } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -25,10 +21,8 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(ipfs *ipfs.IPFS, coorCli *coormq.Client, distLock *distsvc.Service) Manager { +func NewManager(distlock *distsvc.Service) Manager { return task.NewManager(TaskContext{ - ipfs: ipfs, - coordinator: coorCli, - distlock: distLock, + distlock: distlock, }) } diff --git a/main.go b/main.go index b568b39..2536aea 100644 --- a/main.go +++ b/main.go @@ -6,20 +6,19 @@ import ( "os" "sync" - distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" log "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" - agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" + "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock" + agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" "google.golang.org/grpc" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd" grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc" + cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/mq" ) // TODO 此数据是否在运行时会发生变化? @@ -41,17 +40,14 @@ func main() { os.Exit(1) } - ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS) - if err != nil { - log.Fatalf("new ipfs failed, err: %s", err.Error()) - } - - coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Fatalf("new ipfs failed, err: %s", err.Error()) - } + globals.InitLocal(&config.Cfg().Local) + globals.InitMQPool(&config.Cfg().RabbitMQ) + globals.InitAgentRPCPool(&agtrpc.PoolConfig{ + Port: config.Cfg().GRPC.Port, + }) + globals.InitIPFSPool(&config.Cfg().IPFS) - distlock, err := distsvc.NewService(&config.Cfg().DistLock) + distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { log.Fatalf("new ipfs failed, err: %s", err.Error()) } @@ -60,11 +56,11 @@ func main() { wg := sync.WaitGroup{} wg.Add(5) - taskMgr := task.NewManager(ipfs, coorCli, distlock) + taskMgr := task.NewManager(distlock) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(ipfs, &taskMgr, coorCli), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -76,14 +72,14 @@ func main() { go reportStatus(&wg) //网络延迟感知 //面向客户端收发数据 - listenAddr := config.Cfg().GRPCListenAddress + listenAddr := config.Cfg().GRPC.MakeListenAddress() lis, err := net.Listen("tcp", listenAddr) if err != nil { log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() - agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs)) + agtrpc.RegisterAgentServer(s, grpcsvc.NewService()) go serveGRPC(s, lis, &wg) go serveDistLock(distlock) @@ -119,7 +115,7 @@ func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) { wg.Done() } -func serveDistLock(svc *distsvc.Service) { +func serveDistLock(svc *distlock.Service) { log.Info("start serving distlock") err := svc.Serve() diff --git a/status_report.go b/status_report.go index e4ebc71..c7ebbe7 100644 --- a/status_report.go +++ b/status_report.go @@ -7,13 +7,12 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-common/consts" - coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage-common/utils" ) func reportStatus(wg *sync.WaitGroup) { - coorCli, err := coorcli.NewClient(&config.Cfg().RabbitMQ) + coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) if err != nil { wg.Done() log.Error("new coordinator client failed, err: %w", err) @@ -56,7 +55,7 @@ func reportStatus(wg *sync.WaitGroup) { //发送心跳 // TODO 由于数据结构未定,暂时不发送真实数据 - coorCli.AgentStatusReport(coormsg.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) + coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) time.Sleep(time.Minute * 5) }