diff --git a/go.mod b/go.mod index 2958102..8ddc7e0 100644 --- a/go.mod +++ b/go.mod @@ -76,14 +76,14 @@ require ( go 1.18 // 运行go mod tidy时需要将下面几行取消注释 -replace gitlink.org.cn/cloudream/ec => ../ec - -replace gitlink.org.cn/cloudream/proto => ../proto - -replace gitlink.org.cn/cloudream/rabbitmq => ../rabbitmq - -replace gitlink.org.cn/cloudream/common => ../common - -replace gitlink.org.cn/cloudream/db => ../db - -replace magefiles => ../magefiles +// replace gitlink.org.cn/cloudream/ec => ../ec +// +// replace gitlink.org.cn/cloudream/proto => ../proto +// +// replace gitlink.org.cn/cloudream/rabbitmq => ../rabbitmq +// +// replace gitlink.org.cn/cloudream/common => ../common +// +// replace gitlink.org.cn/cloudream/db => ../db +// +// replace magefiles => ../magefiles diff --git a/internal/services/cmd/agent.go b/internal/services/cmd/agent.go index 8d21db9..443480a 100644 --- a/internal/services/cmd/agent.go +++ b/internal/services/cmd/agent.go @@ -10,9 +10,9 @@ func (svc *Service) GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *ramsg var ipfsState string if svc.ipfs.IsUp() { - ipfsState = consts.IPFS_STATE_OK + ipfsState = consts.IPFSStateOK } else { - ipfsState = consts.IPFS_STATE_OK + ipfsState = consts.IPFSStateOK } return ramsg.ReplyOK(agtmsg.NewGetStateRespBody(ipfsState)) diff --git a/internal/services/cmd/ipfs.go b/internal/services/cmd/ipfs.go index 7030568..ec26378 100644 --- a/internal/services/cmd/ipfs.go +++ b/internal/services/cmd/ipfs.go @@ -17,7 +17,7 @@ func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *ra filesMap, err := svc.ipfs.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return ramsg.ReplyFailed[agtmsg.CheckIPFSResp](errorcode.OPERATION_FAILED, "get pinned files from ipfs failed") + return ramsg.ReplyFailed[agtmsg.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") } // TODO 根据锁定清单过滤被锁定的文件的记录 @@ -33,9 +33,9 @@ func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]sh for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] if ok { - if cache.State == consts.CACHE_STATE_PINNED { + if cache.State == consts.CacheStatePinned { // 不处理 - } else if cache.State == consts.CACHE_STATE_TEMP { + } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") err := svc.ipfs.Unpin(cache.FileHash) if err != nil { @@ -47,10 +47,10 @@ func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]sh delete(filesMap, cache.FileHash) } else { - if cache.State == consts.CACHE_STATE_PINNED { + if cache.State == consts.CacheStatePinned { svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash)) - } else if cache.State == consts.CACHE_STATE_TEMP { + } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { entries = append(entries, agtmsg.NewCheckIPFSRespEntry(cache.FileHash, agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } @@ -68,9 +68,9 @@ func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]she for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] if ok { - if cache.State == consts.CACHE_STATE_PINNED { + if cache.State == consts.CacheStatePinned { // 不处理 - } else if cache.State == consts.CACHE_STATE_TEMP { + } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") err := svc.ipfs.Unpin(cache.FileHash) if err != nil { @@ -82,10 +82,10 @@ func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]she delete(filesMap, cache.FileHash) } else { - if cache.State == consts.CACHE_STATE_PINNED { + if cache.State == consts.CacheStatePinned { svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash)) - } else if cache.State == consts.CACHE_STATE_TEMP { + } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { entries = append(entries, agtmsg.NewCheckIPFSRespEntry(cache.FileHash, agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } diff --git a/internal/services/cmd/object.go b/internal/services/cmd/object.go index 31ae03a..5ea99ab 100644 --- a/internal/services/cmd/object.go +++ b/internal/services/cmd/object.go @@ -18,7 +18,7 @@ func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg. if tsk.Error() != nil { log.WithField("FileHash", msg.FileHash). Warnf("pin object failed, err: %s", tsk.Error().Error()) - return ramsg.ReplyFailed[agtmsg.StartPinningObjectResp](errorcode.OPERATION_FAILED, "pin object failed") + return ramsg.ReplyFailed[agtmsg.StartPinningObjectResp](errorcode.OperationFailed, "pin object failed") } return ramsg.ReplyOK(agtmsg.NewStartPinningObjectResp(tsk.ID())) @@ -29,7 +29,7 @@ func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.Wa tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return ramsg.ReplyFailed[agtmsg.WaitPinningObjectResp](errorcode.TASK_NOT_FOUND, "task not found") + return ramsg.ReplyFailed[agtmsg.WaitPinningObjectResp](errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { diff --git a/internal/services/cmd/storage.go b/internal/services/cmd/storage.go index 201c970..c3f8aa1 100644 --- a/internal/services/cmd/storage.go +++ b/internal/services/cmd/storage.go @@ -13,6 +13,7 @@ import ( "gitlink.org.cn/cloudream/agent/internal/config" "gitlink.org.cn/cloudream/agent/internal/task" "gitlink.org.cn/cloudream/common/consts" + "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/utils" "gitlink.org.cn/cloudream/ec" @@ -26,11 +27,11 @@ func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObjec outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID) outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory, outFileName) - if repRed, ok := msg.Redundancy.(ramsg.RepRedundancyData); ok { + if repRed, ok := msg.Redundancy.(models.RepRedundancyData); ok { taskID, err := service.moveRepObject(repRed, outFilePath) if err != nil { logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error()) - return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OPERATION_FAILED, "move rep object failed") + return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move rep object failed") } return ramsg.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID)) @@ -38,11 +39,11 @@ func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObjec } else { // TODO 处理其他备份类型 - return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OPERATION_FAILED, "not implement yet!") + return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "not implement yet!") } } -func (svc *Service) moveRepObject(repData ramsg.RepRedundancyData, outFilePath string) (string, error) { +func (svc *Service) moveRepObject(repData models.RepRedundancyData, outFilePath string) (string, error) { tsk := svc.taskManager.StartComparable(task.NewIPFSRead(repData.FileHash, outFilePath)) return tsk.ID(), nil } @@ -52,7 +53,7 @@ func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*a tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return ramsg.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TASK_NOT_FOUND, "task not found") + return ramsg.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { @@ -125,7 +126,7 @@ func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos [] // 增量情况下,不需要对infosMap中没检查的记录进行处理 - return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) + return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) } func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) { @@ -153,7 +154,7 @@ func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []f // Storage中多出来的文件不做处理 - return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.STORAGE_DIRECTORY_STATE_OK, entries)) + return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) } /* @@ -296,14 +297,14 @@ func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRe file, err := os.Open(fullPath) if err != nil { logger.Warnf("opening file %s: %s", fullPath, err.Error()) - return nil, ramsg.Failed(errorcode.OPERATION_FAILED, "open file failed") + return nil, ramsg.Failed(errorcode.OperationFailed, "open file failed") } fileInfo, err := file.Stat() if err != nil { file.Close() logger.Warnf("getting file %s state: %s", fullPath, err.Error()) - return nil, ramsg.Failed(errorcode.OPERATION_FAILED, "get file info failed") + return nil, ramsg.Failed(errorcode.OperationFailed, "get file info failed") } fileSize := fileInfo.Size() @@ -322,7 +323,7 @@ func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRe func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *ramsg.CodeMessage) { tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return nil, ramsg.Failed(errorcode.TASK_NOT_FOUND, "task not found") + return nil, ramsg.Failed(errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { diff --git a/internal/task/upload_rep_objects.go b/internal/task/upload_rep_objects.go index 4a0a3d8..6edcba0 100644 --- a/internal/task/upload_rep_objects.go +++ b/internal/task/upload_rep_objects.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/agent/internal/config" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" + "gitlink.org.cn/cloudream/common/utils" mygrpc "gitlink.org.cn/cloudream/common/utils/grpc" "gitlink.org.cn/cloudream/common/utils/ipfs" @@ -207,8 +208,10 @@ func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject Uplo uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID) } + dirName := utils.GetDirectoryName(uploadObject.ObjectName) + // 记录写入的文件的Hash - createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash)) + createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash, dirName)) if err != nil { return 0, "", fmt.Errorf("creating rep object: %w", err) } diff --git a/status_report.go b/status_report.go index 66c0b41..2e1d677 100644 --- a/status_report.go +++ b/status_report.go @@ -50,9 +50,9 @@ func reportStatus(wg *sync.WaitGroup) { } waitG.Wait() //TODO: 查看本地IPFS daemon是否正常,记录到ipfsStatus - ipfsStatus := consts.IPFS_STATE_OK + ipfsStatus := consts.IPFSStateOK //TODO:访问自身资源目录(配置文件中获取路径),记录是否正常,记录到localDirStatus - localDirStatus := consts.STORAGE_DIRECTORY_STATE_OK + localDirStatus := consts.StorageDirectoryStateOK //发送心跳 // TODO 由于数据结构未定,暂时不发送真实数据