From 41a9aed6e96e54ab5ed3022ae9a901b013bdc605 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Aug 2023 10:15:57 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ClientPool=EF=BC=8C?= =?UTF-8?q?=E8=A7=A3=E9=99=A4=E5=AF=B9config=E7=9A=84=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/config/config.go | 26 ++++----- internal/services/cmd/agent.go | 19 ------ internal/services/cmd/service.go | 21 ------- .../grpc/{grpc_service.go => service.go} | 58 +++++++++++-------- internal/services/mq/agent.go | 29 ++++++++++ internal/services/{cmd => mq}/ipfs.go | 27 ++++++--- internal/services/{cmd => mq}/object.go | 2 +- internal/services/mq/service.go | 15 +++++ internal/services/{cmd => mq}/storage.go | 55 +++++++++--------- internal/task/create_ec_package.go | 38 ++++++++++++ internal/task/create_rep_package.go | 34 +++++++++++ internal/task/download_package.go | 26 +++++++++ internal/task/ipfs_pin.go | 15 ++++- internal/task/ipfs_read.go | 15 ++++- internal/task/task.go | 12 +--- main.go | 30 +++++----- status_report.go | 7 +-- 17 files changed, 283 insertions(+), 146 deletions(-) delete mode 100644 internal/services/cmd/agent.go delete mode 100644 internal/services/cmd/service.go rename internal/services/grpc/{grpc_service.go => service.go} (67%) create mode 100644 internal/services/mq/agent.go rename internal/services/{cmd => mq}/ipfs.go (81%) rename internal/services/{cmd => mq}/object.go (99%) create mode 100644 internal/services/mq/service.go rename internal/services/{cmd => mq}/storage.go (82%) create mode 100644 internal/task/create_ec_package.go create mode 100644 internal/task/create_rep_package.go create mode 100644 internal/task/download_package.go diff --git a/internal/config/config.go b/internal/config/config.go index c4617f1..a04949b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,25 +2,25 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" log "gitlink.org.cn/cloudream/common/pkgs/logger" c "gitlink.org.cn/cloudream/common/utils/config" - "gitlink.org.cn/cloudream/common/utils/ipfs" + stgmodels "gitlink.org.cn/cloudream/storage-common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/grpc" stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Config struct { - ID int64 `json:"id"` - GRPCListenAddress string `json:"grpcListenAddress"` - GRPCPort int `json:"grpcPort"` - ECPacketSize int64 `json:"ecPacketSize"` - LocalIP string `json:"localIP"` - ExternalIP string `json:"externalIP"` - StorageBaseDir string `json:"storageBaseDir"` - TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 - Logger log.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS ipfs.Config `json:"ipfs"` - DistLock distlock.Config `json:"distlock"` + ID int64 `json:"id"` + Local stgmodels.LocalMachineInfo `json:"local"` + GRPC *grpc.Config `json:"grpc"` + ECPacketSize int64 `json:"ecPacketSize"` + StorageBaseDir string `json:"storageBaseDir"` + TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 + Logger log.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS ipfs.Config `json:"ipfs"` + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/internal/services/cmd/agent.go b/internal/services/cmd/agent.go deleted file mode 100644 index 6fbfc7f..0000000 --- a/internal/services/cmd/agent.go +++ /dev/null @@ -1,19 +0,0 @@ -package cmd - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage-common/consts" - agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" -) - -func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { - var ipfsState string - - if svc.ipfs.IsUp() { - ipfsState = consts.IPFSStateOK - } else { - ipfsState = consts.IPFSStateOK - } - - return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) -} diff --git a/internal/services/cmd/service.go b/internal/services/cmd/service.go deleted file mode 100644 index c9eecd1..0000000 --- a/internal/services/cmd/service.go +++ /dev/null @@ -1,21 +0,0 @@ -package cmd - -import ( - "gitlink.org.cn/cloudream/common/utils/ipfs" - "gitlink.org.cn/cloudream/storage-agent/internal/task" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" -) - -type Service struct { - ipfs *ipfs.IPFS - taskManager *task.Manager - coordinator *coormq.Client -} - -func NewService(ipfs *ipfs.IPFS, taskMgr *task.Manager, coordinator *coormq.Client) *Service { - return &Service{ - ipfs: ipfs, - taskManager: taskMgr, - coordinator: coordinator, - } -} diff --git a/internal/services/grpc/grpc_service.go b/internal/services/grpc/service.go similarity index 67% rename from internal/services/grpc/grpc_service.go rename to internal/services/grpc/service.go index 11210b6..57a76e4 100644 --- a/internal/services/grpc/grpc_service.go +++ b/internal/services/grpc/service.go @@ -6,25 +6,29 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/common/utils/ipfs" - agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" + "gitlink.org.cn/cloudream/storage-common/globals" + agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" ) -type GRPCService struct { - agentserver.FileTransportServer - ipfs *ipfs.IPFS +type Service struct { + agentserver.AgentServer } -func NewService(ipfs *ipfs.IPFS) *GRPCService { - return &GRPCService{ - ipfs: ipfs, - } +func NewService() *Service { + return &Service{} } -func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) error { +func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error { log.Debugf("client upload file") - writer, err := s.ipfs.CreateFile() + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + log.Warnf("new ipfs client: %s", err.Error()) + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + writer, err := ipfsCli.CreateFileStream() if err != nil { log.Warnf("create file failed, err: %s", err.Error()) return fmt.Errorf("create file failed, err: %w", err) @@ -45,18 +49,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) return fmt.Errorf("recv message failed, err: %w", err) } - if msg.Type == agentserver.FileDataPacketType_Data { - err = myio.WriteAll(writer, msg.Data) - if err != nil { - // 关闭文件写入,不需要返回的hash和error - writer.Abort(io.ErrClosedPipe) - log.Warnf("write data to file failed, err: %s", err.Error()) - return fmt.Errorf("write data to file failed, err: %w", err) - } + err = myio.WriteAll(writer, msg.Data) + if err != nil { + // 关闭文件写入,不需要返回的hash和error + writer.Abort(io.ErrClosedPipe) + log.Warnf("write data to file failed, err: %s", err.Error()) + return fmt.Errorf("write data to file failed, err: %w", err) + } - recvSize += int64(len(msg.Data)) + recvSize += int64(len(msg.Data)) - } else if msg.Type == agentserver.FileDataPacketType_EOF { + if msg.Type == agentserver.FileDataPacketType_EOF { // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash hash, err := writer.Finish() if err != nil { @@ -65,7 +68,7 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) } // 并将结果返回到客户端 - err = server.SendAndClose(&agentserver.SendResp{ + err = server.SendAndClose(&agentserver.SendIPFSFileResp{ FileHash: hash, }) if err != nil { @@ -78,10 +81,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) } } -func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTransport_GetFileServer) error { +func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserver.Agent_GetIPFSFileServer) error { log.WithField("FileHash", req.FileHash).Debugf("client download file") - reader, err := s.ipfs.OpenRead(req.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + log.Warnf("new ipfs client: %s", err.Error()) + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + reader, err := ipfsCli.OpenRead(req.FileHash) if err != nil { log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error()) return fmt.Errorf("open file to read failed, err: %w", err) diff --git a/internal/services/mq/agent.go b/internal/services/mq/agent.go new file mode 100644 index 0000000..5246465 --- /dev/null +++ b/internal/services/mq/agent.go @@ -0,0 +1,29 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" +) + +func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { + var ipfsState string + + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + logger.Warnf("new ipfs client: %s", err.Error()) + ipfsState = consts.IPFSStateUnavailable + + } else { + if ipfsCli.IsUp() { + ipfsState = consts.IPFSStateOK + } else { + ipfsState = consts.IPFSStateUnavailable + } + ipfsCli.Close() + } + + return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) +} diff --git a/internal/services/cmd/ipfs.go b/internal/services/mq/ipfs.go similarity index 81% rename from internal/services/cmd/ipfs.go rename to internal/services/mq/ipfs.go index 5d56fb9..fa101ae 100644 --- a/internal/services/cmd/ipfs.go +++ b/internal/services/mq/ipfs.go @@ -1,20 +1,29 @@ -package cmd +package mq import ( "time" shell "github.com/ipfs/go-ipfs-api" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" ) func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { - filesMap, err := svc.ipfs.GetPinnedFiles() + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + logger.Warnf("new ipfs client: %s", err.Error()) + return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "new ipfs client failed") + } + defer ipfsCli.Close() + + filesMap, err := ipfsCli.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") @@ -22,13 +31,13 @@ func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.C // TODO 根据锁定清单过滤被锁定的文件的记录 if msg.IsComplete { - return svc.checkComplete(msg, filesMap) + return svc.checkComplete(msg, filesMap, ipfsCli) } else { - return svc.checkIncrement(msg, filesMap) + return svc.checkIncrement(msg, filesMap, ipfsCli) } } -func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -37,7 +46,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she // 不处理 } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") - err := svc.ipfs.Unpin(cache.FileHash) + err := ipfsCli.Unpin(cache.FileHash) if err != nil { logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } @@ -63,7 +72,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) } -func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -72,7 +81,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel // 不处理 } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") - err := svc.ipfs.Unpin(cache.FileHash) + err := ipfsCli.Unpin(cache.FileHash) if err != nil { logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } @@ -96,7 +105,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel // map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录 for hash := range filesMap { logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry") - err := svc.ipfs.Unpin(hash) + err := ipfsCli.Unpin(hash) if err != nil { logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) } diff --git a/internal/services/cmd/object.go b/internal/services/mq/object.go similarity index 99% rename from internal/services/cmd/object.go rename to internal/services/mq/object.go index c1972c4..499ffb5 100644 --- a/internal/services/cmd/object.go +++ b/internal/services/mq/object.go @@ -1,4 +1,4 @@ -package cmd +package mq import ( "time" diff --git a/internal/services/mq/service.go b/internal/services/mq/service.go new file mode 100644 index 0000000..52a9599 --- /dev/null +++ b/internal/services/mq/service.go @@ -0,0 +1,15 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/storage-agent/internal/task" +) + +type Service struct { + taskManager *task.Manager +} + +func NewService(taskMgr *task.Manager) *Service { + return &Service{ + taskManager: taskMgr, + } +} diff --git a/internal/services/cmd/storage.go b/internal/services/mq/storage.go similarity index 82% rename from internal/services/cmd/storage.go rename to internal/services/mq/storage.go index 19e4859..06711ed 100644 --- a/internal/services/cmd/storage.go +++ b/internal/services/mq/storage.go @@ -1,4 +1,4 @@ -package cmd +package mq import ( "io/fs" @@ -13,18 +13,26 @@ import ( "gitlink.org.cn/cloudream/storage-agent/internal/config" mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/utils" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/mq" - stgcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - myos "gitlink.org.cn/cloudream/storage-common/utils/os" ) func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) { - getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("getting storage info: %s", err.Error()) @@ -40,7 +48,7 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](stgcmd.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))) + tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID())) } @@ -152,7 +160,15 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs } func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { - getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("getting storage info: %s", err.Error()) @@ -180,7 +196,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") } - objIter := myos.NewUploadingObjectIterator(fullPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) if msg.Redundancy.Type == models.RedundancyRep { repInfo, err := msg.Redundancy.ToRepInfo() @@ -190,14 +206,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext]( - stgcmd.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, stgcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: &config.Cfg().ID, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -208,14 +217,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext]( - stgcmd.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, config.Cfg().ECPacketSize, stgcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: &config.Cfg().ID, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -231,17 +233,16 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) } - wrapTask := tsk.Body().(*stgcmd.TaskWrapper[mytask.TaskContext]) - if tsk.Error() != nil { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0)) } - if repTask, ok := wrapTask.InnerTask().(*stgcmd.CreateRepPackage); ok { + // TODO 避免判断类型 + if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID)) } - if ecTask, ok := wrapTask.InnerTask().(*stgcmd.CreateECPackage); ok { + if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID)) } diff --git a/internal/task/create_ec_package.go b/internal/task/create_ec_package.go new file mode 100644 index 0000000..fb69e8c --- /dev/null +++ b/internal/task/create_ec_package.go @@ -0,0 +1,38 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-agent/internal/config" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateECPackageResult = cmd.CreateECPackageResult + +type CreateECPackage struct { + cmd cmd.CreateECPackage + + Result *CreateECPackageResult +} + +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { + return &CreateECPackage{ + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ + UpdatePackageContext: &cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }, + ECPacketSize: config.Cfg().ECPacketSize, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/create_rep_package.go b/internal/task/create_rep_package.go new file mode 100644 index 0000000..7b15c64 --- /dev/null +++ b/internal/task/create_rep_package.go @@ -0,0 +1,34 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateRepPackageResult = cmd.CreateRepPackageResult + +type CreateRepPackage struct { + cmd cmd.CreateRepPackage + + Result *CreateRepPackageResult +} + +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { + return &CreateRepPackage{ + cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/download_package.go b/internal/task/download_package.go new file mode 100644 index 0000000..c3c39ab --- /dev/null +++ b/internal/task/download_package.go @@ -0,0 +1,26 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" +) + +type DownloadPackage struct { + cmd *cmd.DownloadPackage +} + +func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { + return &DownloadPackage{ + cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), + } +} +func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.cmd.Execute(&cmd.DownloadPackageContext{ + Distlock: ctx.distlock, + }) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/ipfs_pin.go b/internal/task/ipfs_pin.go index 1a7fe7a..0489d5c 100644 --- a/internal/task/ipfs_pin.go +++ b/internal/task/ipfs_pin.go @@ -5,6 +5,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" ) type IPFSPin struct { @@ -31,7 +32,19 @@ func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) { log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") - err := ctx.ipfs.Pin(t.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + err := fmt.Errorf("new ipfs client: %w", err) + log.Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer ipfsCli.Close() + + err = ipfsCli.Pin(t.FileHash) if err != nil { err := fmt.Errorf("pin file failed, err: %w", err) log.WithField("FileHash", t.FileHash).Warn(err.Error()) diff --git a/internal/task/ipfs_read.go b/internal/task/ipfs_read.go index 91fb877..7ddfa6f 100644 --- a/internal/task/ipfs_read.go +++ b/internal/task/ipfs_read.go @@ -8,6 +8,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" ) type IPFSRead struct { @@ -61,7 +62,19 @@ func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) { } defer outputFile.Close() - rd, err := ctx.ipfs.OpenRead(t.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + err := fmt.Errorf("new ipfs client: %w", err) + log.Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer ipfsCli.Close() + + rd, err := ipfsCli.OpenRead(t.FileHash) if err != nil { err := fmt.Errorf("read ipfs file failed, err: %w", err) log.WithField("FileHash", t.FileHash).Warn(err.Error()) diff --git a/internal/task/task.go b/internal/task/task.go index 6bbd84f..3653981 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -3,14 +3,10 @@ package task import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/common/utils/ipfs" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type TaskContext struct { - ipfs *ipfs.IPFS - coordinator *coormq.Client - distlock *distsvc.Service + distlock *distsvc.Service } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -25,10 +21,8 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(ipfs *ipfs.IPFS, coorCli *coormq.Client, distLock *distsvc.Service) Manager { +func NewManager(distlock *distsvc.Service) Manager { return task.NewManager(TaskContext{ - ipfs: ipfs, - coordinator: coorCli, - distlock: distLock, + distlock: distlock, }) } diff --git a/main.go b/main.go index b568b39..396b052 100644 --- a/main.go +++ b/main.go @@ -8,18 +8,17 @@ import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" log "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" - agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" + "gitlink.org.cn/cloudream/storage-common/globals" + agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" "google.golang.org/grpc" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd" grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc" + cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/mq" ) // TODO 此数据是否在运行时会发生变化? @@ -41,15 +40,12 @@ func main() { os.Exit(1) } - ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS) - if err != nil { - log.Fatalf("new ipfs failed, err: %s", err.Error()) - } - - coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Fatalf("new ipfs failed, err: %s", err.Error()) - } + globals.InitLocal(&config.Cfg().Local) + globals.InitMQPool(&config.Cfg().RabbitMQ) + globals.InitAgentRPCPool(&agtrpc.PoolConfig{ + Port: config.Cfg().GRPC.Port, + }) + globals.InitIPFSPool(&config.Cfg().IPFS) distlock, err := distsvc.NewService(&config.Cfg().DistLock) if err != nil { @@ -60,11 +56,11 @@ func main() { wg := sync.WaitGroup{} wg.Add(5) - taskMgr := task.NewManager(ipfs, coorCli, distlock) + taskMgr := task.NewManager(distlock) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(ipfs, &taskMgr, coorCli), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -76,14 +72,14 @@ func main() { go reportStatus(&wg) //网络延迟感知 //面向客户端收发数据 - listenAddr := config.Cfg().GRPCListenAddress + listenAddr := config.Cfg().GRPC.MakeListenAddress() lis, err := net.Listen("tcp", listenAddr) if err != nil { log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() - agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs)) + agtrpc.RegisterAgentServer(s, grpcsvc.NewService()) go serveGRPC(s, lis, &wg) go serveDistLock(distlock) diff --git a/status_report.go b/status_report.go index e4ebc71..c7ebbe7 100644 --- a/status_report.go +++ b/status_report.go @@ -7,13 +7,12 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-common/consts" - coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage-common/utils" ) func reportStatus(wg *sync.WaitGroup) { - coorCli, err := coorcli.NewClient(&config.Cfg().RabbitMQ) + coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) if err != nil { wg.Done() log.Error("new coordinator client failed, err: %w", err) @@ -56,7 +55,7 @@ func reportStatus(wg *sync.WaitGroup) { //发送心跳 // TODO 由于数据结构未定,暂时不发送真实数据 - coorCli.AgentStatusReport(coormsg.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) + coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) time.Sleep(time.Minute * 5) } From a2f195322e759e2a6627164f819bf77bd1f135e2 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Aug 2023 16:48:47 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=8B=86=E5=88=86=E9=94=81=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/mq/storage.go | 60 ++++++++++++++++----------------- main.go | 6 ++-- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/internal/services/mq/storage.go b/internal/services/mq/storage.go index 06711ed..3aeeabb 100644 --- a/internal/services/mq/storage.go +++ b/internal/services/mq/storage.go @@ -2,28 +2,26 @@ package mq import ( "io/fs" - "io/ioutil" "os" "path/filepath" "time" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/globals" - "gitlink.org.cn/cloudream/storage-common/utils" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage-common/utils" ) -func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) { +func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { coorCli, err := globals.CoordinatorMQPool.Acquire() if err != nil { logger.Warnf("new coordinator client: %s", err.Error()) @@ -40,7 +38,7 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageMovePackageDirName(msg.PackageID, msg.UserID)) + outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID)) if err = os.MkdirAll(outputDirPath, 0755); err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("creating output directory: %s", err.Error()) @@ -49,15 +47,15 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) } tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) - return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID())) + return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) } -func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*agtmq.WaitStorageMovePackageResp, *mq.CodeMessage) { - logger.WithField("TaskID", msg.TaskID).Debugf("wait moving package") +func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*agtmq.WaitStorageLoadPackageResp, *mq.CodeMessage) { + logger.WithField("TaskID", msg.TaskID).Debugf("wait loading package") tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return mq.ReplyFailed[agtmq.WaitStorageMovePackageResp](errorcode.TaskNotFound, "task not found") + return mq.ReplyFailed[agtmq.WaitStorageLoadPackageResp](errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { @@ -68,7 +66,7 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -78,17 +76,17 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (* errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) } - return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(false, "")) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "")) } } func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) - infos, err := ioutil.ReadDir(dirFullPath) + infos, err := os.ReadDir(dirFullPath) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) return mq.ReplyOK(agtmq.NewStorageCheckResp( @@ -97,30 +95,30 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe )) } - fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() }) + dirInfos := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() }) if msg.IsComplete { - return svc.checkStorageComplete(msg, fileInfos) + return svc.checkStorageComplete(msg, dirInfos) } else { - return svc.checkStorageIncrement(msg, fileInfos) + return svc.checkStorageIncrement(msg, dirInfos) } } -func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) { - infosMap := make(map[string]fs.FileInfo) - for _, info := range fileInfos { +func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) { + infosMap := make(map[string]fs.DirEntry) + for _, info := range dirInfos { infosMap[info.Name()] = info } var entries []agtmq.StorageCheckRespEntry for _, obj := range msg.Packages { - fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID) - _, ok := infosMap[fileName] + dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) + _, ok := infosMap[dirName] if ok { // 不需要做处理 // 删除map中的记录,表示此记录已被检查过 - delete(infosMap, fileName) + delete(infosMap, dirName) } else { // 只要文件不存在,就删除StoragePackage表中的记录 @@ -133,22 +131,22 @@ func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []f return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) } -func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) { +func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) { - infosMap := make(map[string]fs.FileInfo) - for _, info := range fileInfos { + infosMap := make(map[string]fs.DirEntry) + for _, info := range dirInfos { infosMap[info.Name()] = info } var entries []agtmq.StorageCheckRespEntry for _, obj := range msg.Packages { - fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID) - _, ok := infosMap[fileName] + dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) + _, ok := infosMap[dirName] if ok { // 不需要做处理 // 删除map中的记录,表示此记录已被检查过 - delete(infosMap, fileName) + delete(infosMap, dirName) } else { // 只要文件不存在,就删除StoragePackage表中的记录 @@ -176,7 +174,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") } - fullPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path) + fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)) var uploadFilePathes []string err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { diff --git a/main.go b/main.go index 396b052..2536aea 100644 --- a/main.go +++ b/main.go @@ -6,11 +6,11 @@ import ( "os" "sync" - distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" "google.golang.org/grpc" @@ -47,7 +47,7 @@ func main() { }) globals.InitIPFSPool(&config.Cfg().IPFS) - distlock, err := distsvc.NewService(&config.Cfg().DistLock) + distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { log.Fatalf("new ipfs failed, err: %s", err.Error()) } @@ -115,7 +115,7 @@ func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) { wg.Done() } -func serveDistLock(svc *distsvc.Service) { +func serveDistLock(svc *distlock.Service) { log.Info("start serving distlock") err := svc.Serve() From dca2f02ab10e1ead6f5ebb05a6fb7c65ce9407f5 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 23 Aug 2023 16:11:01 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/services/mq/{ipfs.go => cache.go} | 57 ++++++++++-- internal/task/cache_move_package.go | 102 +++++++++++++++++++++ internal/task/create_ec_package.go | 5 + internal/task/create_rep_package.go | 5 + 4 files changed, 159 insertions(+), 10 deletions(-) rename internal/services/mq/{ipfs.go => cache.go} (60%) create mode 100644 internal/task/cache_move_package.go diff --git a/internal/services/mq/ipfs.go b/internal/services/mq/cache.go similarity index 60% rename from internal/services/mq/ipfs.go rename to internal/services/mq/cache.go index fa101ae..ce76401 100644 --- a/internal/services/mq/ipfs.go +++ b/internal/services/mq/cache.go @@ -10,23 +10,24 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" + mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" ) -func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { ipfsCli, err := globals.IPFSPool.Acquire() if err != nil { logger.Warnf("new ipfs client: %s", err.Error()) - return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "new ipfs client failed") + return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "new ipfs client failed") } defer ipfsCli.Close() filesMap, err := ipfsCli.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") + return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "get pinned files from ipfs failed") } // TODO 根据锁定清单过滤被锁定的文件的记录 @@ -37,7 +38,7 @@ func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.C } } -func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -61,7 +62,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { - entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } } @@ -69,10 +70,10 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she // 增量情况下,不需要对filesMap中没检查的记录进行处理 - return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) + return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) } -func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -96,7 +97,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel } else if cache.State == consts.CacheStateTemp { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { - entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } } @@ -109,8 +110,44 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel if err != nil { logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) } - entries = append(entries, agtmq.NewCheckIPFSRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP)) + entries = append(entries, agtmq.NewCheckCacheRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP)) } - return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) + return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) +} + +func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) { + tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID)) + return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID())) +} + +func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) { + tsk := svc.taskManager.FindByID(msg.TaskID) + if tsk == nil { + return mq.ReplyFailed[agtmq.WaitCacheMovePackageResp](errorcode.TaskNotFound, "task not found") + } + + if msg.WaitTimeoutMs == 0 { + tsk.Wait() + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + + } else { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) + } + + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) + } } diff --git a/internal/task/cache_move_package.go b/internal/task/cache_move_package.go new file mode 100644 index 0000000..26cc3a7 --- /dev/null +++ b/internal/task/cache_move_package.go @@ -0,0 +1,102 @@ +package task + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +type CacheMovePackage struct { + userID int64 + packageID int64 +} + +func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { + return &CacheMovePackage{ + userID: userID, + packageID: packageID, + } +} + +func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.do(ctx) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} + +func (t *CacheMovePackage) do(ctx TaskContext) error { + log := logger.WithType[CacheMovePackage]("Task") + log.Debugf("begin with %v", logger.FormatStruct(t)) + defer log.Debugf("end") + + // TOOD EC的锁 + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 读取Package信息和包含的Object信息 + Package().ReadOne(t.packageID).Object().ReadAny(). + // 读取Rep对象的配置 + ObjectRep().ReadAny(). + // 创建Cache记录 + Cache().CreateAny(). + IPFS(). + // pin文件 + CreateAnyRep(*globals.Local.NodeID). + MutexLock(ctx.distlock) + if err != nil { + return fmt.Errorf("acquiring distlock: %w", err) + } + defer mutex.Unlock() + + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) + if err != nil { + return fmt.Errorf("getting package: %w", err) + } + + if pkgResp.Redundancy.IsRepInfo() { + return t.moveRep(ctx, coorCli, pkgResp.Package) + } else { + // TODO EC的CacheMove逻辑 + } + + return nil +} +func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, pkg model.Package) error { + getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID)) + if err != nil { + return fmt.Errorf("getting package object rep data: %w", err) + } + + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + var fileHashes []string + for _, rep := range getRepResp.Data { + if err := ipfsCli.Pin(rep.FileHash); err != nil { + return fmt.Errorf("pinning file %s: %w", rep.FileHash, err) + } + + fileHashes = append(fileHashes, rep.FileHash) + } + + _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes)) + if err != nil { + return fmt.Errorf("reporting cache package moved: %w", err) + } + + return nil +} diff --git a/internal/task/create_ec_package.go b/internal/task/create_ec_package.go index fb69e8c..61fad01 100644 --- a/internal/task/create_ec_package.go +++ b/internal/task/create_ec_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" @@ -24,6 +25,10 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera } func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + log := logger.WithType[CreateECPackage]("Task") + log.Debugf("begin") + defer log.Debugf("end") + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ UpdatePackageContext: &cmd.UpdatePackageContext{ Distlock: ctx.distlock, diff --git a/internal/task/create_rep_package.go b/internal/task/create_rep_package.go index 7b15c64..1ab9401 100644 --- a/internal/task/create_rep_package.go +++ b/internal/task/create_rep_package.go @@ -4,6 +4,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" ) @@ -23,6 +24,10 @@ func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iter } func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + log := logger.WithType[CreateRepPackage]("Task") + log.Debugf("begin") + defer log.Debugf("end") + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, })