diff --git a/internal/cmdline/commandline.go b/internal/cmdline/commandline.go index 4d825d2..0533bd1 100644 --- a/internal/cmdline/commandline.go +++ b/internal/cmdline/commandline.go @@ -5,8 +5,6 @@ import ( "os" "gitlink.org.cn/cloudream/common/pkgs/cmdtrie" - distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" - "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-client/internal/services" ) @@ -17,16 +15,12 @@ type CommandContext struct { var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() type Commandline struct { - Svc *services.Service - DistLock *distlocksvc.Service - IPFS *ipfs.IPFS + Svc *services.Service } -func NewCommandline(svc *services.Service, distLock *distlocksvc.Service, ipfs *ipfs.IPFS) (*Commandline, error) { +func NewCommandline(svc *services.Service) (*Commandline, error) { return &Commandline{ - Svc: svc, - DistLock: distLock, - IPFS: ipfs, + Svc: svc, }, nil } diff --git a/internal/cmdline/distlock.go b/internal/cmdline/distlock.go index d0c1bfd..277e850 100644 --- a/internal/cmdline/distlock.go +++ b/internal/cmdline/distlock.go @@ -6,8 +6,8 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/distlock/lockprovider" "gitlink.org.cn/cloudream/common/pkgs/distlock/service" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" ) func DistLockLock(ctx CommandContext, lockData []string) error { @@ -22,7 +22,7 @@ func DistLockLock(ctx CommandContext, lockData []string) error { req.Locks = append(req.Locks, l) } - reqID, err := ctx.Cmdline.DistLock.Acquire(req, service.AcquireOption{ + reqID, err := ctx.Cmdline.Svc.DistLock.Acquire(req, service.AcquireOption{ RetryTimeMs: 5000, }) if err != nil { @@ -62,7 +62,7 @@ func parseOneLock(lockData string) (distlock.Lock, error) { } func DistLockUnlock(ctx CommandContext, reqID string) error { - return ctx.Cmdline.DistLock.Release(reqID) + return ctx.Cmdline.Svc.DistLock.Release(reqID) } func init() { diff --git a/internal/cmdline/package.go b/internal/cmdline/package.go index c169a97..65b4415 100644 --- a/internal/cmdline/package.go +++ b/internal/cmdline/package.go @@ -9,6 +9,7 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" ) func PackageListBucketPackages(ctx CommandContext, bucketID int64) error { @@ -46,17 +47,23 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 defer objIter.Close() for { - objInfo, ok := objIter.MoveNext() - if !ok { + objInfo, err := objIter.MoveNext() + if err == iterator.ErrNoMoreItem { break } - - if objInfo.Error != nil { - return objInfo.Error + if err != nil { + return err } defer objInfo.File.Close() - outputFile, err := os.Create(filepath.Join(outputDir, objInfo.Object.Path)) + fullPath := filepath.Join(outputDir, objInfo.Object.Path) + + dirPath := filepath.Dir(fullPath) + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("creating object dir: %w", err) + } + + outputFile, err := os.Create(fullPath) if err != nil { return fmt.Errorf("creating object file: %w", err) } @@ -72,6 +79,8 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 } func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int) error { + rootPath = filepath.Clean(rootPath) + var uploadFilePathes []string err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { if err != nil { @@ -88,7 +97,7 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64 return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, models.NewRepRedundancyInfo(repCount)) if err != nil { @@ -141,7 +150,7 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter) if err != nil { return fmt.Errorf("update object %d failed, err: %w", packageID, err) @@ -180,7 +189,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, models.NewECRedundancyInfo(ecName)) if err != nil { @@ -233,7 +242,7 @@ func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - objIter := myos.NewUploadingObjectIterator(rootPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter) if err != nil { return fmt.Errorf("update package %d failed, err: %w", packageID, err) diff --git a/internal/cmdline/storage.go b/internal/cmdline/storage.go index 266543c..a8a22f9 100644 --- a/internal/cmdline/storage.go +++ b/internal/cmdline/storage.go @@ -7,14 +7,14 @@ import ( "gitlink.org.cn/cloudream/common/models" ) -func StorageMovePackage(ctx CommandContext, packageID int64, storageID int64) error { - taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageMovePackage(0, packageID, storageID) +func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) error { + taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID) if err != nil { - return fmt.Errorf("start moving package to storage: %w", err) + return fmt.Errorf("start loading package to storage: %w", err) } for { - complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10) + complete, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) @@ -54,7 +54,7 @@ func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, st } func init() { - commands.MustAdd(StorageMovePackage, "storage", "move", "pkg") + commands.MustAdd(StorageLoadPackage, "stg", "load", "pkg") - commands.MustAdd(StorageCreateRepPackage, "storage", "upload", "rep") + commands.MustAdd(StorageCreateRepPackage, "stg", "upload", "rep") } diff --git a/internal/config/config.go b/internal/config/config.go index c65dad9..e4847eb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,22 +2,23 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/config" - "gitlink.org.cn/cloudream/common/utils/ipfs" + stgmodels "gitlink.org.cn/cloudream/storage-common/models" + agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Config struct { - GRPCPort int `json:"grpcPort"` - ECPacketSize int64 `json:"ecPacketSize"` - MaxRepCount int `json:"maxRepCount"` - LocalIP string `json:"localIP"` - ExternalIP string `json:"externalIP"` - Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + Local stgmodels.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + ECPacketSize int64 `json:"ecPacketSize"` + MaxRepCount int `json:"maxRepCount"` + Logger logger.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/internal/http/object.go b/internal/http/object.go new file mode 100644 index 0000000..671dc26 --- /dev/null +++ b/internal/http/object.go @@ -0,0 +1,70 @@ +package http + +import ( + "io" + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + myio "gitlink.org.cn/cloudream/common/utils/io" +) + +type ObjectService struct { + *Server +} + +func (s *Server) ObjectSvc() *ObjectService { + return &ObjectService{ + Server: s, + } +} + +type ObjectDownloadReq struct { + UserID *int64 `form:"userID" binding:"required"` + ObjectID *int64 `form:"objectID" binding:"required"` +} + +func (s *ObjectService) Download(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Download") + + var req ObjectDownloadReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + file, err := s.svc.ObjectSvc().Download(*req.UserID, *req.ObjectID) + if err != nil { + log.Warnf("downloading object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) + return + } + + ctx.Writer.WriteHeader(http.StatusOK) + // TODO 需要设置FileName + ctx.Header("Content-Disposition", "attachment; filename=filename") + ctx.Header("Content-Type", "application/octet-stream") + + buf := make([]byte, 4096) + ctx.Stream(func(w io.Writer) bool { + rd, err := file.Read(buf) + if err == io.EOF { + return false + } + + if err != nil { + log.Warnf("reading file data: %s", err.Error()) + return false + } + + err = myio.WriteAll(w, buf[:rd]) + if err != nil { + log.Warnf("writing data to response: %s", err.Error()) + return false + } + + return true + }) +} diff --git a/internal/http/package.go b/internal/http/package.go index d2890bc..bae4170 100644 --- a/internal/http/package.go +++ b/internal/http/package.go @@ -1,7 +1,6 @@ package http import ( - "io" "mime/multipart" "net/http" "time" @@ -24,55 +23,6 @@ func (s *Server) PackageSvc() *PackageService { } } -type PackageDownloadReq struct { - UserID *int64 `form:"userID" binding:"required"` - PackageID *int64 `form:"packageID" binding:"required"` -} - -func (s *PackageService) Download(ctx *gin.Context) { - log := logger.WithField("HTTP", "Package.Download") - - var req PackageDownloadReq - if err := ctx.ShouldBindQuery(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - return - } - - file, err := s.svc.PackageSvc().DownloadPackage(*req.UserID, *req.PackageID) - if err != nil { - log.Warnf("downloading package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download package failed")) - return - } - - ctx.Writer.WriteHeader(http.StatusOK) - // TODO 需要设置FileName - ctx.Header("Content-Disposition", "attachment; filename=filename") - ctx.Header("Content-Type", "application/octet-stream") - - buf := make([]byte, 4096) - ctx.Stream(func(w io.Writer) bool { - rd, err := file.Read(buf) - if err == io.EOF { - return false - } - - if err != nil { - log.Warnf("reading file data: %s", err.Error()) - return false - } - - err = myio.WriteAll(w, buf[:rd]) - if err != nil { - log.Warnf("writing data to response: %s", err.Error()) - return false - } - - return true - }) -} - type PackageUploadReq struct { Info PackageUploadInfo `form:"info" binding:"required"` Files []*multipart.FileHeader `form:"files"` diff --git a/internal/http/server.go b/internal/http/server.go index 72738e0..996c676 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -38,9 +38,9 @@ func (s *Server) Serve() error { } func (s *Server) initRouters() { - s.engine.GET("/package/download", s.PackageSvc().Download) + s.engine.GET("/object/download", s.ObjectSvc().Download) s.engine.POST("/package/upload", s.PackageSvc().Upload) s.engine.POST("/package/delete", s.PackageSvc().Delete) - s.engine.POST("/storage/movePackage", s.StorageSvc().MovePackage) + s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage) } diff --git a/internal/http/storage.go b/internal/http/storage.go index d64065a..8574fa2 100644 --- a/internal/http/storage.go +++ b/internal/http/storage.go @@ -19,35 +19,35 @@ func (s *Server) StorageSvc() *StorageService { } } -type StorageMovePackageReq struct { +type StorageLoadPackageReq struct { UserID *int64 `json:"userID" binding:"required"` PackageID *int64 `json:"packageID" binding:"required"` StorageID *int64 `json:"storageID" binding:"required"` } -func (s *StorageService) MovePackage(ctx *gin.Context) { - log := logger.WithField("HTTP", "Storage.MovePackage") +func (s *StorageService) LoadPackage(ctx *gin.Context) { + log := logger.WithField("HTTP", "Storage.LoadPackage") - var req StorageMovePackageReq + var req StorageLoadPackageReq if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - taskID, err := s.svc.StorageSvc().StartStorageMovePackage(*req.UserID, *req.PackageID, *req.StorageID) + taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID) if err != nil { - log.Warnf("start storage move package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed")) + log.Warnf("start storage load package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed")) return } for { - complete, err := s.svc.StorageSvc().WaitStorageMovePackage(taskID, time.Second*10) + complete, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) if complete { if err != nil { - log.Warnf("moving complete with: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed")) + log.Warnf("loading complete with: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed")) return } @@ -56,8 +56,8 @@ func (s *StorageService) MovePackage(ctx *gin.Context) { } if err != nil { - log.Warnf("wait moving: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage move package failed")) + log.Warnf("wait loadding: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed")) return } } diff --git a/internal/services/bucket.go b/internal/services/bucket.go index 5b843ab..adedf5a 100644 --- a/internal/services/bucket.go +++ b/internal/services/bucket.go @@ -3,8 +3,9 @@ package services import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) @@ -22,7 +23,13 @@ func (svc *BucketService) GetBucket(userID int64, bucketID int64) (model.Bucket, } func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { - resp, err := svc.coordinator.GetUserBuckets(coormq.NewGetUserBuckets(userID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + resp, err := coorCli.GetUserBuckets(coormq.NewGetUserBuckets(userID)) if err != nil { return nil, fmt.Errorf("get user buckets failed, err: %w", err) } @@ -31,7 +38,13 @@ func (svc *BucketService) GetUserBuckets(userID int64) ([]model.Bucket, error) { } func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]model.Package, error) { - resp, err := svc.coordinator.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + resp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) if err != nil { return nil, fmt.Errorf("get bucket packages failed, err: %w", err) } @@ -40,6 +53,12 @@ func (svc *BucketService) GetBucketPackages(userID int64, bucketID int64) ([]mod } func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return 0, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + // TODO 只有阅读了系统操作的源码,才能知道要加哪些锁,但用户的命令可能会调用不止一个系统操作。 // 因此加锁的操作还是必须在用户命令里完成,但具体加锁的内容,则需要被封装起来与系统操作放到一起,方便管理,避免分散改动。 @@ -47,13 +66,13 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, Metadata().Bucket().CreateOne(userID, bucketName). // TODO 可以考虑二次加锁,加的更精确 UserBucket().CreateAny(). - MutexLock(svc.distlock) + MutexLock(svc.DistLock) if err != nil { return 0, fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex.Unlock() - resp, err := svc.coordinator.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) + resp, err := coorCli.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) if err != nil { return 0, fmt.Errorf("creating bucket: %w", err) } @@ -62,24 +81,30 @@ func (svc *BucketService) CreateBucket(userID int64, bucketName string) (int64, } func (svc *BucketService) DeleteBucket(userID int64, bucketID int64) error { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + // TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁 mutex, err := reqbuilder.NewBuilder(). Metadata(). UserBucket().WriteAny(). Bucket().WriteOne(bucketID). - // TODO2 + Package().WriteAny(). Object().WriteAny(). ObjectRep().WriteAny(). ObjectBlock().WriteAny(). - StorageObject().WriteAny(). - MutexLock(svc.distlock) + StoragePackage().WriteAny(). + MutexLock(svc.DistLock) if err != nil { return fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex.Unlock() - _, err = svc.coordinator.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) + _, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } diff --git a/internal/services/object.go b/internal/services/object.go new file mode 100644 index 0000000..94c6de7 --- /dev/null +++ b/internal/services/object.go @@ -0,0 +1,15 @@ +package services + +import "io" + +type ObjectService struct { + *Service +} + +func (svc *Service) ObjectSvc() *ObjectService { + return &ObjectService{Service: svc} +} + +func (svc *ObjectService) Download(userID int64, objectID int64) (io.ReadCloser, error) { + panic("not implement yet!") +} diff --git a/internal/services/package.go b/internal/services/package.go index c7ef8e6..69c193d 100644 --- a/internal/services/package.go +++ b/internal/services/package.go @@ -8,7 +8,10 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/storage-client/internal/config" mytask "gitlink.org.cn/cloudream/storage-client/internal/task" + "gitlink.org.cn/cloudream/storage-common/globals" agtcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) @@ -22,197 +25,193 @@ func (svc *Service) PackageSvc() *PackageService { } func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) { - /* - TODO2 - // TODO zkx 需要梳理EC锁涉及的锁,补充下面漏掉的部分 - mutex, err := reqbuilder.NewBuilder(). - // 用于判断用户是否有对象权限 - Metadata().UserBucket().ReadAny(). - // 用于查询可用的下载节点 - Node().ReadAny(). - // 用于读取文件信息 - Object().ReadOne(objectID). - // 用于查询Rep配置 - ObjectRep().ReadOne(objectID). - // 用于查询Block配置 - ObjectBlock().ReadAny(). - // 用于查询包含了副本的节点 - Cache().ReadAny(). - MutexLock(svc.distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - */ - getPkgResp, err := svc.coordinator.GetPackage(coormq.NewGetPackage(userID, packageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + mutex, err := reqbuilder.NewBuilder(). + // 用于判断用户是否有对象权限 + Metadata().UserBucket().ReadAny(). + // 用于查询可用的下载节点 + Node().ReadAny(). + // 用于读取包信息 + Package().ReadOne(packageID). + // 用于读取包内的文件信息 + Object().ReadAny(). + // 用于查询Rep配置 + ObjectRep().ReadAny(). + // 用于查询Block配置 + ObjectBlock().ReadAny(). + // 用于查询包含了副本的节点 + Cache().ReadAny(). + MutexLock(svc.DistLock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + + getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID)) if err != nil { return nil, fmt.Errorf("getting package: %w", err) } - getObjsResp, err := svc.coordinator.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) + getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) if err != nil { return nil, fmt.Errorf("getting package objects: %w", err) } if getPkgResp.Redundancy.Type == models.RedundancyRep { - getObjRepDataResp, err := svc.coordinator.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID)) + iter, err := svc.downloadRepPackage(packageID, getObjsResp.Objects, coorCli) + + if err != nil { + mutex.Unlock() + return nil, err + } + + iter.OnClosing = func() { + mutex.Unlock() + } + + return iter, nil + } else { + iter, err := svc.downloadECPackage(getPkgResp.Package, getObjsResp.Objects, coorCli) + if err != nil { - return nil, fmt.Errorf("getting package object rep data: %w", err) + mutex.Unlock() + return nil, err } - iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, svc.coordinator, svc.distlock, iterator.DownloadConfig{ - LocalIPFS: svc.ipfs, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }) + iter.OnClosing = func() { + mutex.Unlock() + } return iter, nil } +} - getObjECDataResp, err := svc.coordinator.GetPackageObjectECData(coormq.NewGetPackageObjectECData(packageID)) +func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.RepObjectIterator, error) { + getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID)) + if err != nil { + return nil, fmt.Errorf("getting package object rep data: %w", err) + } + + iter := iterator.NewRepObjectIterator(objects, getObjRepDataResp.Data, &iterator.DownloadContext{ + Distlock: svc.DistLock, + }) + + return iter, nil +} +func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.Object, coorCli *coormq.PoolClient) (*iterator.ECObjectIterator, error) { + getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(pkg.PackageID)) if err != nil { return nil, fmt.Errorf("getting package object ec data: %w", err) } var ecRed models.ECRedundancyInfo - if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil { + if err := serder.AnyToAny(pkg.Redundancy.Info, &ecRed); err != nil { return nil, fmt.Errorf("get ec redundancy info: %w", err) } - getECResp, err := svc.coordinator.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) + getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) if err != nil { return nil, fmt.Errorf("getting ec: %w", err) } - iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, svc.coordinator, svc.distlock, getECResp.Config, config.Cfg().ECPacketSize, iterator.DownloadConfig{ - LocalIPFS: svc.ipfs, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, + iter := iterator.NewECObjectIterator(objects, getObjECDataResp.Data, getECResp.Config, &iterator.ECDownloadContext{ + DownloadContext: &iterator.DownloadContext{ + Distlock: svc.DistLock, + }, + ECPacketSize: config.Cfg().ECPacketSize, }) return iter, nil } func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo models.RepRedundancyInfo) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewCreateRepPackage( - userID, bucketID, name, objIter, - repInfo, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo)) return tsk.ID(), nil } -func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) +func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *mytask.CreateRepPackageResult, error) { + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage) - return true, &cteatePkgTask.Result, tsk.Error() + cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage) + return true, cteatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewUpdateRepPackage( - userID, packageID, objIter, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewUpdateRepPackage(userID, packageID, objIter)) return tsk.ID(), nil } func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateRepPackage) - return true, &updatePkgTask.Result, tsk.Error() + updatePkgTask := tsk.Body().(*mytask.UpdateRepPackage) + return true, updatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo models.ECRedundancyInfo) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewCreateECPackage( - userID, bucketID, name, objIter, - ecInfo, - config.Cfg().ECPacketSize, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo)) return tsk.ID(), nil } func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.CreateRepPackage) - return true, &cteatePkgTask.Result, tsk.Error() + cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage) + return true, cteatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.taskMgr.StartNew(agtcmd.Wrap[mytask.TaskContext]( - agtcmd.NewUpdateECPackage( - userID, packageID, objIter, - config.Cfg().ECPacketSize, - agtcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: nil, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.TaskMgr.StartNew(mytask.NewUpdateECPackage(userID, packageID, objIter)) return tsk.ID(), nil } func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) { - tsk := svc.taskMgr.FindByID(taskID) + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*agtcmd.TaskWrapper[mytask.TaskContext]).InnerTask().(*agtcmd.UpdateECPackage) - return true, &updatePkgTask.Result, tsk.Error() + updatePkgTask := tsk.Body().(*mytask.UpdateECPackage) + return true, updatePkgTask.Result, tsk.Error() } return false, nil, nil } func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { - /* - // TODO2 - mutex, err := reqbuilder.NewBuilder(). - Metadata(). - // 用于判断用户是否有对象的权限 - UserBucket().ReadAny(). - // 用于读取、修改对象信息 - Object().WriteOne(objectID). - // 用于删除Rep配置 - ObjectRep().WriteOne(objectID). - // 用于删除Block配置 - ObjectBlock().WriteAny(). - // 用于修改Move此Object的记录的状态 - StorageObject().WriteAny(). - MutexLock(svc.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有对象的权限 + UserBucket().ReadAny(). + // 用于读取、修改包信息 + Package().WriteOne(packageID). + // 用于删除包内的所有文件 + Object().WriteAny(). + // 用于删除Rep配置 + ObjectRep().WriteAny(). + // 用于删除Block配置 + ObjectBlock().WriteAny(). + // 用于修改Move此Object的记录的状态 + StoragePackage().WriteAny(). + MutexLock(svc.DistLock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() - _, err := svc.coordinator.DeletePackage(coormq.NewDeletePackage(userID, packageID)) + _, err = coorCli.DeletePackage(coormq.NewDeletePackage(userID, packageID)) if err != nil { return fmt.Errorf("deleting package: %w", err) } diff --git a/internal/services/scanner.go b/internal/services/scanner.go index 58e568e..3743c60 100644 --- a/internal/services/scanner.go +++ b/internal/services/scanner.go @@ -2,6 +2,8 @@ package services import ( "fmt" + + "gitlink.org.cn/cloudream/storage-common/globals" ) type ScannerService struct { @@ -13,7 +15,13 @@ func (svc *Service) ScannerSvc() *ScannerService { } func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error { - err := svc.scanner.PostEvent(event, isEmergency, dontMerge) + scCli, err := globals.ScannerMQPool.Acquire() + if err != nil { + return fmt.Errorf("new scacnner client: %w", err) + } + defer scCli.Close() + + err = scCli.PostEvent(event, isEmergency, dontMerge) if err != nil { return fmt.Errorf("request to scanner failed, err: %w", err) } diff --git a/internal/services/service.go b/internal/services/service.go index 638fd3f..3f21616 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -2,26 +2,17 @@ package services import ( distlock "gitlink.org.cn/cloudream/common/pkgs/distlock/service" - "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-client/internal/task" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" ) type Service struct { - coordinator *coormq.Client - ipfs *ipfs.IPFS - scanner *scmq.Client - distlock *distlock.Service - taskMgr *task.Manager + DistLock *distlock.Service + TaskMgr *task.Manager } -func NewService(coorClient *coormq.Client, ipfsClient *ipfs.IPFS, scanner *scmq.Client, distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) { +func NewService(distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) { return &Service{ - coordinator: coorClient, - ipfs: ipfsClient, - scanner: scanner, - distlock: distlock, - taskMgr: taskMgr, + DistLock: distlock, + TaskMgr: taskMgr, }, nil } diff --git a/internal/services/storage.go b/internal/services/storage.go index 34a439e..9e9c44d 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -5,8 +5,8 @@ import ( "time" "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/task" + "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) @@ -19,13 +19,13 @@ func (svc *Service) StorageSvc() *StorageService { return &StorageService{Service: svc} } -func (svc *StorageService) StartStorageMovePackage(userID int64, packageID int64, storageID int64) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewStorageMovePackage(userID, packageID, storageID)) +func (svc *StorageService) StartStorageLoadPackage(userID int64, packageID int64, storageID int64) (string, error) { + tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID)) return tsk.ID(), nil } -func (svc *StorageService) WaitStorageMovePackage(taskID string, waitTimeout time.Duration) (bool, error) { - tsk := svc.taskMgr.FindByID(taskID) +func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, error) { + tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { return true, tsk.Error() } @@ -40,12 +40,18 @@ func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, s // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int64, name string, storageID int64, path string, redundancy models.TypedRedundancyInfo) (int64, string, error) { - stgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return 0, "", fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) if err != nil { return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := agtmq.NewClient(stgResp.NodeID, &config.Cfg().RabbitMQ) + agentCli, err := globals.AgentMQPool.Acquire(stgResp.NodeID) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } @@ -60,7 +66,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID int64, bucketID int6 } func (svc *StorageService) WaitStorageCreatePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, int64, error) { - agentCli, err := agtmq.NewClient(nodeID, &config.Cfg().RabbitMQ) + agentCli, err := globals.AgentMQPool.Acquire(nodeID) if err != nil { // TODO 失败是否要当做任务已经结束? return true, 0, fmt.Errorf("new agent client: %w", err) diff --git a/internal/task/create_ec_package.go b/internal/task/create_ec_package.go new file mode 100644 index 0000000..005abd4 --- /dev/null +++ b/internal/task/create_ec_package.go @@ -0,0 +1,38 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-client/internal/config" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateECPackageResult = cmd.CreateECPackageResult + +type CreateECPackage struct { + cmd cmd.CreateECPackage + + Result *CreateECPackageResult +} + +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { + return &CreateECPackage{ + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ + UpdatePackageContext: &cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }, + ECPacketSize: config.Cfg().ECPacketSize, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/create_rep_package.go b/internal/task/create_rep_package.go new file mode 100644 index 0000000..7b15c64 --- /dev/null +++ b/internal/task/create_rep_package.go @@ -0,0 +1,34 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateRepPackageResult = cmd.CreateRepPackageResult + +type CreateRepPackage struct { + cmd cmd.CreateRepPackage + + Result *CreateRepPackageResult +} + +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { + return &CreateRepPackage{ + cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/storage_load_package.go b/internal/task/storage_load_package.go new file mode 100644 index 0000000..977e105 --- /dev/null +++ b/internal/task/storage_load_package.go @@ -0,0 +1,108 @@ +package task + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" +) + +type StorageLoadPackage struct { + userID int64 + packageID int64 + storageID int64 +} + +func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage { + return &StorageLoadPackage{ + userID: userID, + packageID: packageID, + storageID: storageID, + } +} + +func (t *StorageLoadPackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.do(ctx) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} + +func (t *StorageLoadPackage) do(ctx TaskContext) error { + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有Storage权限 + UserStorage().ReadOne(t.packageID, t.storageID). + // 用于判断用户是否有对象权限 + UserBucket().ReadAny(). + // 用于读取包信息 + Package().ReadOne(t.packageID). + // 用于读取对象信息 + Object().ReadAny(). + // 用于查询Rep配置 + ObjectRep().ReadAny(). + // 用于查询Block配置 + ObjectBlock().ReadAny(). + // 用于创建Move记录 + StoragePackage().CreateOne(t.storageID, t.userID, t.packageID). + Storage(). + // 用于创建对象文件 + CreateOnePackage(t.storageID, t.userID, t.packageID). + MutexLock(ctx.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) + if err != nil { + return fmt.Errorf("getting storage info: %w", err) + } + + // 然后向代理端发送移动文件的请求 + agentClient, err := globals.AgentMQPool.Acquire(getStgResp.NodeID) + if err != nil { + return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) + } + defer agentClient.Close() + + agentMoveResp, err := agentClient.StartStorageLoadPackage( + agtmq.NewStartStorageLoadPackage( + t.userID, + t.packageID, + t.storageID, + )) + if err != nil { + return fmt.Errorf("start loading package to storage: %w", err) + } + + for { + waitResp, err := agentClient.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5)) + if err != nil { + return fmt.Errorf("wait loading package: %w", err) + } + + if waitResp.IsComplete { + if waitResp.Error != "" { + return fmt.Errorf("agent loading package: %s", waitResp.Error) + } + + break + } + } + + _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.packageID, t.storageID)) + if err != nil { + return fmt.Errorf("loading package to storage: %w", err) + } + return nil +} diff --git a/internal/task/storage_move_package.go b/internal/task/storage_move_package.go deleted file mode 100644 index 9f34b34..0000000 --- a/internal/task/storage_move_package.go +++ /dev/null @@ -1,101 +0,0 @@ -package task - -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/storage-client/internal/config" - agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" -) - -type StorageMovePackage struct { - userID int64 - packageID int64 - storageID int64 -} - -func NewStorageMovePackage(userID int64, packageID int64, storageID int64) *StorageMovePackage { - return &StorageMovePackage{ - userID: userID, - packageID: packageID, - storageID: storageID, - } -} - -func (t *StorageMovePackage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *StorageMovePackage) do(ctx TaskContext) error { - /* - TODO2 - mutex, err := reqbuilder.NewBuilder(). - Metadata(). - // 用于判断用户是否有Storage权限 - UserStorage().ReadOne(t.packageID, t.storageID). - // 用于判断用户是否有对象权限 - UserBucket().ReadAny(). - // 用于读取对象信息 - Object().ReadOne(t.packageID). - // 用于查询Rep配置 - ObjectRep().ReadOne(t.packageID). - // 用于查询Block配置 - ObjectBlock().ReadAny(). - // 用于创建Move记录 - StorageObject().CreateOne(t.storageID, t.userID, t.packageID). - Storage(). - // 用于创建对象文件 - CreateOneObject(t.storageID, t.userID, t.packageID). - MutexLock(ctx.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ - getStgResp, err := ctx.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) - if err != nil { - return fmt.Errorf("getting storage info: %w", err) - } - - // 然后向代理端发送移动文件的请求 - agentClient, err := agtmq.NewClient(getStgResp.NodeID, &config.Cfg().RabbitMQ) - if err != nil { - return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) - } - defer agentClient.Close() - - agentMoveResp, err := agentClient.StartStorageMovePackage( - agtmq.NewStartStorageMovePackage( - t.userID, - t.packageID, - t.storageID, - )) - if err != nil { - return fmt.Errorf("start moving package to storage: %w", err) - } - - for { - waitResp, err := agentClient.WaitStorageMovePackage(agtmq.NewWaitStorageMovePackage(agentMoveResp.TaskID, int64(time.Second)*5)) - if err != nil { - return fmt.Errorf("wait moving package: %w", err) - } - - if waitResp.IsComplete { - if waitResp.Error != "" { - return fmt.Errorf("agent moving package: %s", waitResp.Error) - } - - break - } - } - - _, err = ctx.Coordinator().PackageMovedToStorage(coormq.NewPackageMovedToStorage(t.userID, t.packageID, t.storageID)) - if err != nil { - return fmt.Errorf("moving package to storage: %w", err) - } - return nil -} diff --git a/internal/task/task.go b/internal/task/task.go index 69466e3..3653981 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -3,14 +3,10 @@ package task import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/common/utils/ipfs" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type TaskContext struct { - ipfs *ipfs.IPFS - distLock *distsvc.Service - coordinator *coormq.Client + distlock *distsvc.Service } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -25,10 +21,8 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(ipfs *ipfs.IPFS, distlock *distsvc.Service, coorCli *coormq.Client) Manager { +func NewManager(distlock *distsvc.Service) Manager { return task.NewManager(TaskContext{ - ipfs: ipfs, - distLock: distlock, - coordinator: coorCli, + distlock: distlock, }) } diff --git a/internal/task/update_ec_package.go b/internal/task/update_ec_package.go new file mode 100644 index 0000000..15035db --- /dev/null +++ b/internal/task/update_ec_package.go @@ -0,0 +1,38 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/storage-client/internal/config" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type UpdateECPackageResult = cmd.UpdateECPackageResult + +type UpdateECPackage struct { + cmd cmd.UpdateECPackage + + Result *UpdateECPackageResult +} + +func NewUpdateECPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateECPackage { + return &UpdateECPackage{ + cmd: *cmd.NewUpdateECPackage(userID, packageID, objectIter), + } +} + +func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ + UpdatePackageContext: &cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }, + ECPacketSize: config.Cfg().ECPacketSize, + }) + + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/update_rep_package.go b/internal/task/update_rep_package.go new file mode 100644 index 0000000..18646d1 --- /dev/null +++ b/internal/task/update_rep_package.go @@ -0,0 +1,34 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type UpdateRepPackageResult = cmd.UpdateRepPackageResult + +type UpdateRepPackage struct { + cmd cmd.UpdateRepPackage + + Result *UpdateRepPackageResult +} + +func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage { + return &UpdateRepPackage{ + cmd: *cmd.NewUpdateRepPackage(userID, packageID, objectIter), + } +} + +func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }) + + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/main.go b/main.go index 35fa37f..8546da5 100644 --- a/main.go +++ b/main.go @@ -6,15 +6,13 @@ import ( _ "google.golang.org/grpc/balancer/grpclb" - distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-client/internal/cmdline" "gitlink.org.cn/cloudream/storage-client/internal/config" "gitlink.org.cn/cloudream/storage-client/internal/services" "gitlink.org.cn/cloudream/storage-client/internal/task" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" + "gitlink.org.cn/cloudream/storage-common/globals" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock" ) func main() { @@ -24,77 +22,53 @@ func main() { os.Exit(1) } - err = log.Init(&config.Cfg().Logger) + err = logger.Init(&config.Cfg().Logger) if err != nil { fmt.Printf("init logger failed, err: %s", err.Error()) os.Exit(1) } - coorClient, err := coormq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Warnf("new coordinator client failed, err: %s", err.Error()) - os.Exit(1) - } - - scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Warnf("new scanner client failed, err: %s", err.Error()) - os.Exit(1) - } - - var ipfsCli *ipfs.IPFS + globals.InitLocal(&config.Cfg().Local) + globals.InitMQPool(&config.Cfg().RabbitMQ) + globals.InitAgentRPCPool(&config.Cfg().AgentGRPC) if config.Cfg().IPFS != nil { - log.Infof("IPFS config is not empty, so create a ipfs client") + logger.Infof("IPFS config is not empty, so create a ipfs client") - ipfsCli, err = ipfs.NewIPFS(config.Cfg().IPFS) - if err != nil { - log.Warnf("new ipfs client failed, err: %s", err.Error()) - os.Exit(1) - } + globals.InitIPFSPool(config.Cfg().IPFS) } - distlockSvc, err := distlocksvc.NewService(&config.Cfg().DistLock) + distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { - log.Warnf("new distlock service failed, err: %s", err.Error()) + logger.Warnf("new distlock service failed, err: %s", err.Error()) os.Exit(1) } go serveDistLock(distlockSvc) - taskMgr := task.NewManager(ipfsCli, distlockSvc, coorClient) + taskMgr := task.NewManager(distlockSvc) - svc, err := services.NewService(coorClient, ipfsCli, scanner, distlockSvc, &taskMgr) + svc, err := services.NewService(distlockSvc, &taskMgr) if err != nil { - log.Warnf("new services failed, err: %s", err.Error()) + logger.Warnf("new services failed, err: %s", err.Error()) os.Exit(1) } - cmds, err := cmdline.NewCommandline(svc, distlockSvc, ipfsCli) + cmds, err := cmdline.NewCommandline(svc) if err != nil { - log.Warnf("new command line failed, err: %s", err.Error()) + logger.Warnf("new command line failed, err: %s", err.Error()) os.Exit(1) } cmds.DispatchCommand(os.Args[1:]) - /* - TO DO future: - 1. ls命令,显示用户指定桶下的所有对象,及相关的元数据 - 2. rm命令,用户指定bucket和object名,执行删除操作 - 3. update命令,用户发起对象更新命令,查询元数据,判断对象的冗余方式,删除旧对象(unpin所有的副本或编码块),写入新对象 - 4. ipfsStat命令,查看本地有无ipfsdaemon,ipfs目录的使用率 - 5. ipfsFlush命令,unpin本地ipfs目录中的所有cid(block) - 6. 改为交互式client,输入用户名及秘钥后进入交互界面 - 7. 支持纯缓存类型的IPFS节点,数据一律存在后端存储服务中 - */ } -func serveDistLock(svc *distlocksvc.Service) { - log.Info("start serving distlock") +func serveDistLock(svc *distlock.Service) { + logger.Info("start serving distlock") err := svc.Serve() if err != nil { - log.Errorf("distlock stopped with error: %s", err.Error()) + logger.Errorf("distlock stopped with error: %s", err.Error()) } - log.Info("distlock stopped") + logger.Info("distlock stopped") }