diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index 941a5e2..5f87529 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -195,27 +195,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka } objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) - - if msg.Redundancy.IsRepInfo() { - repInfo, err := msg.Redundancy.ToRepInfo() - if err != nil { - logger.Warnf("getting rep redundancy info: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") - } - - tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, msg.NodeAffinity)) - return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) - } - - ecInfo, err := msg.Redundancy.ToECInfo() - if err != nil { - logger.Warnf("getting ec redundancy info: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") - } - - tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, msg.NodeAffinity)) + tsk := svc.taskManager.StartNew(mytask.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name, objIter, msg.NodeAffinity)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -235,14 +215,6 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0)) } - // TODO 避免判断类型 - if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok { - return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID)) - } - - if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok { - return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID)) - } - - return nil, mq.Failed(errorcode.TaskNotFound, "task not found") + taskBody := tsk.Body().(*mytask.CreatePackage) + return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", taskBody.Result.PackageID)) } diff --git a/agent/internal/task/create_ec_package.go b/agent/internal/task/create_ec_package.go index 8b3df9b..b70fae8 100644 --- a/agent/internal/task/create_ec_package.go +++ b/agent/internal/task/create_ec_package.go @@ -10,22 +10,22 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) -type CreateECPackageResult = cmd.CreateECPackageResult +type CreatePackageResult = cmd.CreatePackageResult -type CreateECPackage struct { - cmd cmd.CreateECPackage +type CreatePackage struct { + cmd cmd.CreatePackage - Result *CreateECPackageResult + Result *CreatePackageResult } -func NewCreateECPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreateECPackage { - return &CreateECPackage{ - cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, nodeAffinity), +func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { + return &CreatePackage{ + cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), } } -func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - log := logger.WithType[CreateECPackage]("Task") +func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { + log := logger.WithType[CreatePackage]("Task") log.Debugf("begin") defer log.Debugf("end") diff --git a/client/internal/cmdline/bucket.go b/client/internal/cmdline/bucket.go index c96a234..a35dd56 100644 --- a/client/internal/cmdline/bucket.go +++ b/client/internal/cmdline/bucket.go @@ -4,10 +4,11 @@ import ( "fmt" "github.com/jedib0t/go-pretty/v6/table" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) func BucketListUserBuckets(ctx CommandContext) error { - userID := int64(0) + userID := cdssdk.UserID(0) buckets, err := ctx.Cmdline.Svc.BucketSvc().GetUserBuckets(userID) if err != nil { @@ -28,7 +29,7 @@ func BucketListUserBuckets(ctx CommandContext) error { } func BucketCreateBucket(ctx CommandContext, bucketName string) error { - userID := int64(0) + userID := cdssdk.UserID(0) bucketID, err := ctx.Cmdline.Svc.BucketSvc().CreateBucket(userID, bucketName) if err != nil { @@ -39,8 +40,8 @@ func BucketCreateBucket(ctx CommandContext, bucketName string) error { return nil } -func BucketDeleteBucket(ctx CommandContext, bucketID int64) error { - userID := int64(0) +func BucketDeleteBucket(ctx CommandContext, bucketID cdssdk.BucketID) error { + userID := cdssdk.UserID(0) err := ctx.Cmdline.Svc.BucketSvc().DeleteBucket(userID, bucketID) if err != nil { diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index 1a849ce..d4e358c 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -3,9 +3,11 @@ package cmdline import ( "fmt" "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -func CacheMovePackage(ctx CommandContext, packageID int64, nodeID int64) error { +func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(0, packageID, nodeID) if err != nil { return fmt.Errorf("start cache moving package: %w", err) diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index 10b824c..b39ecb6 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -12,8 +12,8 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) -func PackageListBucketPackages(ctx CommandContext, bucketID int64) error { - userID := int64(0) +func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error { + userID := cdssdk.UserID(0) packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID) if err != nil { @@ -23,17 +23,17 @@ func PackageListBucketPackages(ctx CommandContext, bucketID int64) error { fmt.Printf("Find %d packages in bucket %d for user %d:\n", len(packages), bucketID, userID) tb := table.NewWriter() - tb.AppendHeader(table.Row{"ID", "Name", "BucketID", "State", "Redundancy"}) + tb.AppendHeader(table.Row{"ID", "Name", "BucketID", "State"}) for _, obj := range packages { - tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID, obj.State, obj.Redundancy}) + tb.AppendRow(table.Row{obj.PackageID, obj.Name, obj.BucketID, obj.State}) } - fmt.Print(tb.Render()) + fmt.Println(tb.Render()) return nil } -func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int64) error { +func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID cdssdk.PackageID) error { err := os.MkdirAll(outputDir, os.ModePerm) if err != nil { return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) @@ -86,7 +86,7 @@ func PackageDownloadPackage(ctx CommandContext, outputDir string, packageID int6 return nil } -func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64, name string, repCount int, nodeAffinity []int64) error { +func PackageCreatePackage(ctx CommandContext, rootPath string, bucketID cdssdk.BucketID, name string, nodeAffinity []cdssdk.NodeID) error { rootPath = filepath.Clean(rootPath) var uploadFilePathes []string @@ -105,122 +105,24 @@ func PackageUploadRepPackage(ctx CommandContext, rootPath string, bucketID int64 return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) } - var nodeAff *int64 + var nodeAff *cdssdk.NodeID if len(nodeAffinity) > 0 { - nodeAff = &nodeAffinity[0] + n := cdssdk.NodeID(nodeAffinity[0]) + nodeAff = &n } objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingRepPackage(0, bucketID, name, objIter, cdssdk.NewRepRedundancyInfo(repCount), nodeAff) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingPackage(0, bucketID, name, objIter, nodeAff) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) } for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5) + complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) if complete { if err != nil { - return fmt.Errorf("uploading rep object: %w", err) - } - - tb := table.NewWriter() - - tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"}) - for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { - tb.AppendRow(table.Row{ - uploadObjectResult.ObjectResults[i].Info.Path, - uploadObjectResult.ObjectResults[i].ObjectID, - uploadObjectResult.ObjectResults[i].FileHash, - }) - } - fmt.Print(tb.Render()) - return nil - } - - if err != nil { - return fmt.Errorf("wait uploading: %w", err) - } - } -} - -func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath string) error { - //userID := int64(0) - - var uploadFilePathes []string - err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) - } - - objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingRepPackage(0, packageID, objIter) - if err != nil { - return fmt.Errorf("update object %d failed, err: %w", packageID, err) - } - - for { - complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingRepPackage(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("updating rep object: %w", err) - } - - return nil - } - - if err != nil { - return fmt.Errorf("wait updating: %w", err) - } - } -} - -func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int, nodeAffinity []int64) error { - rootPath = filepath.Clean(rootPath) - - var uploadFilePathes []string - err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) - } - - var nodeAff *int64 - if len(nodeAffinity) > 0 { - nodeAff = &nodeAffinity[0] - } - - objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, cdssdk.NewECRedundancyInfo(ecName, chunkSize), nodeAff) - - if err != nil { - return fmt.Errorf("upload file data failed, err: %w", err) - } - - for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("uploading ec package: %w", err) + return fmt.Errorf("uploading package: %w", err) } tb := table.NewWriter() @@ -242,7 +144,7 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, } } -func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string) error { +func PackageUpdatePackage(ctx CommandContext, packageID cdssdk.PackageID, rootPath string) error { //userID := int64(0) var uploadFilePathes []string @@ -262,16 +164,16 @@ func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string } objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingECPackage(0, packageID, objIter) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingPackage(0, packageID, objIter) if err != nil { return fmt.Errorf("update package %d failed, err: %w", packageID, err) } for { - complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingECPackage(taskID, time.Second*5) + complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingPackage(taskID, time.Second*5) if complete { if err != nil { - return fmt.Errorf("updating ec package: %w", err) + return fmt.Errorf("updating package: %w", err) } return nil @@ -283,8 +185,8 @@ func PackageUpdateECPackage(ctx CommandContext, packageID int64, rootPath string } } -func PackageDeletePackage(ctx CommandContext, packageID int64) error { - userID := int64(0) +func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error { + userID := cdssdk.UserID(0) err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID) if err != nil { return fmt.Errorf("delete package %d failed, err: %w", packageID, err) @@ -292,7 +194,7 @@ func PackageDeletePackage(ctx CommandContext, packageID int64) error { return nil } -func PackageGetCachedNodes(ctx CommandContext, packageID int64, userID int64) error { +func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error { resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID) fmt.Printf("resp: %v\n", resp) if err != nil { @@ -301,7 +203,7 @@ func PackageGetCachedNodes(ctx CommandContext, packageID int64, userID int64) er return nil } -func PackageGetLoadedNodes(ctx CommandContext, packageID int64, userID int64) error { +func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID, userID cdssdk.UserID) error { nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID) fmt.Printf("nodeIDs: %v\n", nodeIDs) if err != nil { @@ -315,13 +217,9 @@ func init() { commands.MustAdd(PackageDownloadPackage, "pkg", "get") - commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "rep") - - commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "rep") - - commands.MustAdd(PackageUploadECPackage, "pkg", "new", "ec") + commands.MustAdd(PackageCreatePackage, "pkg", "new") - commands.MustAdd(PackageUpdateECPackage, "pkg", "update", "ec") + commands.MustAdd(PackageUpdatePackage, "pkg", "update") commands.MustAdd(PackageDeletePackage, "pkg", "delete") diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index bfa49b9..b99bd4a 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -7,7 +7,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) error { +func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error { taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID) if err != nil { return fmt.Errorf("start loading package to storage: %w", err) @@ -30,11 +30,10 @@ func StorageLoadPackage(ctx CommandContext, packageID int64, storageID int64) er } } -func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, storageID int64, path string, repCount int) error { - nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, - cdssdk.NewTypedRepRedundancyInfo(repCount), nil) +func StorageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string) error { + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, nil) if err != nil { - return fmt.Errorf("start storage uploading rep package: %w", err) + return fmt.Errorf("start storage uploading package: %w", err) } for { @@ -55,7 +54,7 @@ func StorageCreateRepPackage(ctx CommandContext, bucketID int64, name string, st } func init() { - commands.MustAdd(StorageLoadPackage, "stg", "load", "pkg") + commands.MustAdd(StorageLoadPackage, "stg", "pkg", "load") - commands.MustAdd(StorageCreateRepPackage, "stg", "upload", "rep") + commands.MustAdd(StorageCreatePackage, "stg", "pkg", "new") } diff --git a/client/internal/http/package.go b/client/internal/http/package.go index c259f1f..c1c454e 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -89,7 +89,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { objIter := mapMultiPartFileToUploadingObject(req.Files) - taskID, err := s.svc.PackageSvc().StartCreatingECPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity) + taskID, err := s.svc.PackageSvc().StartCreatingPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity) if err != nil { log.Warnf("start uploading ec package task: %s", err.Error()) @@ -98,7 +98,7 @@ func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { } for { - complete, createResult, err := s.svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5) + complete, createResult, err := s.svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) if complete { if err != nil { log.Warnf("uploading ec package: %s", err.Error()) diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 2677252..54e9a49 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -23,7 +23,7 @@ func (svc *Service) PackageSvc() *PackageService { return &PackageService{Service: svc} } -func (svc *PackageService) Get(userID int64, packageID int64) (*model.Package, error) { +func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) (*model.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -38,7 +38,7 @@ func (svc *PackageService) Get(userID int64, packageID int64) (*model.Package, e return &getResp.Package, nil } -func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (iterator.DownloadingObjectIterator, error) { +func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -65,137 +65,50 @@ func (svc *PackageService) DownloadPackage(userID int64, packageID int64) (itera return nil, fmt.Errorf("acquire locks failed, err: %w", err) } - getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID)) + getObjsResp, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(packageID)) if err != nil { - return nil, fmt.Errorf("getting package: %w", err) + return nil, fmt.Errorf("getting package object details: %w", err) } - getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) - if err != nil { - return nil, fmt.Errorf("getting package objects: %w", err) - } - - if getPkgResp.Redundancy.IsRepInfo() { - iter, err := svc.downloadRepPackage(packageID, getObjsResp.Objects, coorCli) - - if err != nil { - mutex.Unlock() - return nil, err - } - - iter.OnClosing = func() { - mutex.Unlock() - } - - return iter, nil - } else { - iter, err := svc.downloadECPackage(getPkgResp.Package, getObjsResp.Objects, coorCli) - - if err != nil { - mutex.Unlock() - return nil, err - } - - iter.OnClosing = func() { - mutex.Unlock() - } - - return iter, nil - } -} - -func (svc *PackageService) downloadRepPackage(packageID int64, objects []model.Object, coorCli *coormq.Client) (*iterator.RepObjectIterator, error) { - getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(packageID)) - if err != nil { - return nil, fmt.Errorf("getting package object rep data: %w", err) - } - - iter := iterator.NewRepObjectIterator(objects, getObjRepDataResp.Data, &iterator.DownloadContext{ + iter := iterator.NewObjectIterator(getObjsResp.Objects, &iterator.DownloadContext{ Distlock: svc.DistLock, }) - return iter, nil -} -func (svc *PackageService) downloadECPackage(pkg model.Package, objects []model.Object, coorCli *coormq.Client) (*iterator.ECObjectIterator, error) { - getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(pkg.PackageID)) - if err != nil { - return nil, fmt.Errorf("getting package object ec data: %w", err) - } - - var ecInfo cdssdk.ECRedundancyInfo - if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil { - return nil, fmt.Errorf("get ec redundancy info: %w", err) - } - - getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName)) - if err != nil { - return nil, fmt.Errorf("getting ec: %w", err) + iter.OnClosing = func() { + mutex.Unlock() } - - iter := iterator.NewECObjectIterator(objects, getObjECDataResp.Data, ecInfo, getECResp.Config, &iterator.DownloadContext{ - Distlock: svc.DistLock, - }) - return iter, nil } -func (svc *PackageService) StartCreatingRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, repInfo cdssdk.RepRedundancyInfo, nodeAffinity *int64) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewCreateRepPackage(userID, bucketID, name, objIter, repInfo, nodeAffinity)) - return tsk.ID(), nil -} - -func (svc *PackageService) WaitCreatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *mytask.CreateRepPackageResult, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage) - return true, cteatePkgTask.Result, tsk.Error() - } - return false, nil, nil -} - -func (svc *PackageService) StartUpdatingRepPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewUpdateRepPackage(userID, packageID, objIter)) - return tsk.ID(), nil -} - -func (svc *PackageService) WaitUpdatingRepPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateRepPackageResult, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*mytask.UpdateRepPackage) - return true, updatePkgTask.Result, tsk.Error() - } - return false, nil, nil -} - -func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, ecInfo cdssdk.ECRedundancyInfo, nodeAffinity *int64) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewCreateECPackage(userID, bucketID, name, objIter, ecInfo, nodeAffinity)) +func (svc *PackageService) StartCreatingPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) { + tsk := svc.TaskMgr.StartNew(mytask.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity)) return tsk.ID(), nil } -func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateECPackageResult, error) { +func (svc *PackageService) WaitCreatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreatePackageResult, error) { tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*mytask.CreateECPackage) + cteatePkgTask := tsk.Body().(*mytask.CreatePackage) return true, cteatePkgTask.Result, tsk.Error() } return false, nil, nil } -func (svc *PackageService) StartUpdatingECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewUpdateECPackage(userID, packageID, objIter)) +func (svc *PackageService) StartUpdatingPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) (string, error) { + tsk := svc.TaskMgr.StartNew(mytask.NewUpdatePackage(userID, packageID, objIter)) return tsk.ID(), nil } -func (svc *PackageService) WaitUpdatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdateECPackageResult, error) { +func (svc *PackageService) WaitUpdatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdatePackageResult, error) { tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*mytask.UpdateECPackage) + updatePkgTask := tsk.Body().(*mytask.UpdatePackage) return true, updatePkgTask.Result, tsk.Error() } return false, nil, nil } -func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { +func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) @@ -230,7 +143,7 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { return nil } -func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (cdssdk.PackageCachingInfo, error) { +func (svc *PackageService) GetCachedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) (cdssdk.PackageCachingInfo, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return cdssdk.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err) @@ -250,7 +163,7 @@ func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (cdssdk return tmp, nil } -func (svc *PackageService) GetLoadedNodes(userID int64, packageID int64) ([]int64, error) { +func (svc *PackageService) GetLoadedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]cdssdk.NodeID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) diff --git a/client/internal/task/create_ec_package.go b/client/internal/task/create_ec_package.go index 4eaecc4..1f826b9 100644 --- a/client/internal/task/create_ec_package.go +++ b/client/internal/task/create_ec_package.go @@ -9,21 +9,21 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) -type CreateECPackageResult = cmd.CreateECPackageResult +type CreatePackageResult = cmd.CreatePackageResult -type CreateECPackage struct { - cmd cmd.CreateECPackage +type CreatePackage struct { + cmd cmd.CreatePackage - Result *CreateECPackageResult + Result *CreatePackageResult } -func NewCreateECPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreateECPackage { - return &CreateECPackage{ - cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, nodeAffinity), +func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { + return &CreatePackage{ + cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), } } -func (t *CreateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, }) diff --git a/client/internal/task/update_ec_package.go b/client/internal/task/update_ec_package.go index 15bf9f0..518cc98 100644 --- a/client/internal/task/update_ec_package.go +++ b/client/internal/task/update_ec_package.go @@ -9,21 +9,21 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) -type UpdateECPackageResult = cmd.UpdateECPackageResult +type UpdatePackageResult = cmd.UpdatePackageResult -type UpdateECPackage struct { - cmd cmd.UpdateECPackage +type UpdatePackage struct { + cmd cmd.UpdatePackage - Result *UpdateECPackageResult + Result *UpdatePackageResult } -func NewUpdateECPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdateECPackage { - return &UpdateECPackage{ - cmd: *cmd.NewUpdateECPackage(userID, packageID, objectIter), +func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdatePackage { + return &UpdatePackage{ + cmd: *cmd.NewUpdatePackage(userID, packageID, objectIter), } } -func (t *UpdateECPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *UpdatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ Distlock: ctx.distlock, }) diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 2fdcdfe..3f1bd2e 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -37,13 +37,15 @@ values 5010, 1, "alive" - ) create table Storage ( - StorageID int not null auto_increment primary key comment '存储服务ID', - Name varchar(100) not null comment '存储服务名称', - NodeID int not null comment '存储服务所在节点的ID', - Directory varchar(4096) not null comment '存储服务所在节点的目录', - State varchar(100) comment '状态' - ) comment = "存储服务表"; + ); + +create table Storage ( + StorageID int not null auto_increment primary key comment '存储服务ID', + Name varchar(100) not null comment '存储服务名称', + NodeID int not null comment '存储服务所在节点的ID', + Directory varchar(4096) not null comment '存储服务所在节点的目录', + State varchar(100) comment '状态' +) comment = "存储服务表"; insert into Storage (StorageID, Name, NodeID, Directory, State) @@ -122,16 +124,12 @@ create table Object ( UNIQUE KEY PackagePath (PackageID, Path) ) comment = '对象表'; -create table ObjectRep ( - ObjectID int not null primary key comment '对象ID', - FileHash varchar(100) not null comment '副本哈希值' -) comment = '对象副本表'; - create table ObjectBlock ( ObjectID int not null comment '对象ID', `Index` int not null comment '编码块在条带内的排序', + NodeID int not null comment '此编码块应该存在的节点', FileHash varchar(100) not null comment '编码块哈希值', - primary key(ObjectID, `Index`) + primary key(ObjectID, `Index`, NodeID) ) comment = '对象编码块表'; create table Cache ( @@ -151,6 +149,14 @@ create table StoragePackage ( primary key(PackageID, StorageID, UserID) ); +create table StoragePackageLog ( + PackageID int not null comment '包ID', + StorageID int not null comment '存储服务ID', + UserID int not null comment '调度了此文件的用户ID', + CreateTime timestamp not null comment '加载Package完成的时间', + primary key(PackageID, StorageID, UserID) +); + create table Location ( LocationID int not null auto_increment primary key comment 'ID', Name varchar(128) not null comment '名称' @@ -159,21 +165,4 @@ create table Location ( insert into Location (LocationID, Name) values - (1, "Local"); - -create table Ec ( - EcID int not null primary key comment '纠删码ID', - Name varchar(128) not null comment '纠删码名称', - EcK int not null comment 'ecK', - EcN int not null comment 'ecN' -) comment = '纠删码表'; - -insert into - Ec (EcID, Name, EcK, EcN) -values - (1, "rs_9_6", 6, 9); - -insert into - Ec (EcID, Name, EcK, EcN) -values - (2, "rs_5_3", 3, 5); \ No newline at end of file + (1, "Local"); \ No newline at end of file diff --git a/common/consts/consts.go b/common/consts/consts.go index 6fc4bcb..d4fe5a1 100644 --- a/common/consts/consts.go +++ b/common/consts/consts.go @@ -10,17 +10,6 @@ const ( NodeStateUnavailable = "Unavailable" ) -const ( - PackageStateNormal = "Normal" - PackageStateDeleted = "Deleted" -) - -const ( - StoragePackageStateNormal = "Normal" - StoragePackageStateDeleted = "Deleted" - StoragePackageStateOutdated = "Outdated" -) - const ( CacheStatePinned = "Pinned" CacheStateTemp = "Temp" diff --git a/common/models/models.go b/common/models/models.go index 2a223fd..64098ab 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -2,40 +2,48 @@ package stgmod import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) -/// TODO 将分散在各处的公共结构体定义集中到这里来 +type ObjectBlock struct { + ObjectID cdssdk.ObjectID `db:"ObjectID" json:"objectID"` + Index int `db:"Index" json:"index"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` // 这个块应该在哪个节点上 + FileHash string `db:"FileHash" json:"fileHash"` +} -type ObjectBlockData struct { +type ObjectBlockDetail struct { + ObjectID cdssdk.ObjectID `json:"objectID"` Index int `json:"index"` FileHash string `json:"fileHash"` - NodeID cdssdk.NodeID `json:"nodeID"` - CachedNodeIDs []cdssdk.NodeID `json:"nodeIDs"` + NodeIDs []cdssdk.NodeID `json:"nodeID"` // 这个块应该在哪些节点上 + CachedNodeIDs []cdssdk.NodeID `json:"cachedNodeIDs"` // 哪些节点实际缓存了这个块 } -func NewObjectBlockData(index int, fileHash string, nodeID cdssdk.NodeID, cachedNodeIDs []cdssdk.NodeID) ObjectBlockData { - return ObjectBlockData{ +func NewObjectBlockDetail(objID cdssdk.ObjectID, index int, fileHash string, nodeIDs []cdssdk.NodeID, cachedNodeIDs []cdssdk.NodeID) ObjectBlockDetail { + return ObjectBlockDetail{ + ObjectID: objID, Index: index, FileHash: fileHash, + NodeIDs: nodeIDs, CachedNodeIDs: cachedNodeIDs, } } -type ObjectECData struct { - Object model.Object `json:"object"` - Blocks []ObjectBlockData `json:"blocks"` +type ObjectDetail struct { + Object cdssdk.Object `json:"object"` + Blocks []ObjectBlockDetail `json:"blocks"` } -func NewObjectECData(object model.Object, blocks []ObjectBlockData) ObjectECData { - return ObjectECData{ +func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlockDetail) ObjectDetail { + return ObjectDetail{ Object: object, Blocks: blocks, } } type LocalMachineInfo struct { - NodeID *cdssdk.NodeID `json:"nodeID"` - ExternalIP string `json:"externalIP"` - LocalIP string `json:"localIP"` + NodeID *cdssdk.NodeID `json:"nodeID"` + ExternalIP string `json:"externalIP"` + LocalIP string `json:"localIP"` + LocationID cdssdk.LocationID `json:"locationID"` } diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index 18b9a6a..c94da80 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -1,56 +1,70 @@ package cmd import ( + "errors" "fmt" "io" "math/rand" "sync" + "time" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type CreateECPackage struct { - userID int64 - bucketID int64 +type CreatePackage struct { + userID cdssdk.UserID + bucketID cdssdk.BucketID name string objectIter iterator.UploadingObjectIterator - redundancy cdssdk.ECRedundancyInfo - nodeAffinity *int64 + nodeAffinity *cdssdk.NodeID } -type CreateECPackageResult struct { - PackageID int64 - ObjectResults []ECObjectUploadResult +type CreatePackageResult struct { + PackageID cdssdk.PackageID + ObjectResults []ObjectUploadResult } -type ECObjectUploadResult struct { - Info *iterator.IterUploadingObject - Error error - ObjectID int64 +type ObjectUploadResult struct { + Info *iterator.IterUploadingObject + Error error + // TODO 这个字段没有被赋值 + ObjectID cdssdk.ObjectID } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage { - return &CreateECPackage{ +type UploadNodeInfo struct { + Node model.Node + IsSameLocation bool +} + +type UpdatePackageContext struct { + Distlock *distlock.Service +} + +func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { + return &CreatePackage{ userID: userID, bucketID: bucketID, name: name, objectIter: objIter, - redundancy: redundancy, nodeAffinity: nodeAffinity, } } -func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageResult, error) { +func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult, error) { defer t.objectIter.Close() coorCli, err := stgglb.CoordinatorMQPool.Acquire() @@ -78,8 +92,7 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe } defer mutex.Unlock() - createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, - cdssdk.NewTypedRedundancyInfo(t.redundancy))) + createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name)) if err != nil { return nil, fmt.Errorf("creating package: %w", err) } @@ -89,30 +102,20 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) - if err != nil { - return nil, fmt.Errorf("finding client location: %w", err) - } - - uploadNodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { + userNodes := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { return UploadNodeInfo{ Node: node, - IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, + IsSameLocation: node.LocationID == stgglb.Local.LocationID, } }) - getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(t.redundancy.ECName)) - if err != nil { - return nil, fmt.Errorf("getting ec: %w", err) - } - // 给上传节点的IPFS加锁 ipfsReqBlder := reqbuilder.NewBuilder() // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 if stgglb.Local.NodeID != nil { ipfsReqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) } - for _, node := range uploadNodeInfos { + for _, node := range userNodes { if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID { continue } @@ -126,27 +129,66 @@ func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageRe } defer ipfsMutex.Unlock() - // TODO 需要支持设置节点亲和性 - rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config) + rets, err := uploadAndUpdatePackage(createPkgResp.PackageID, t.objectIter, userNodes, t.nodeAffinity) if err != nil { return nil, err } - return &CreateECPackageResult{ + return &CreatePackageResult{ PackageID: createPkgResp.PackageID, ObjectResults: rets, }, nil } -func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) { +// chooseUploadNode 选择一个上传文件的节点 +// 1. 选择设置了亲和性的节点 +// 2. 从与当前客户端相同地域的节点中随机选一个 +// 3. 没有用的话从所有节点中随机选一个 +func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) UploadNodeInfo { + if nodeAffinity != nil { + aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity }) + if ok { + return aff + } + } + + sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) + if len(sameLocationNodes) > 0 { + return sameLocationNodes[rand.Intn(len(sameLocationNodes))] + } + + return nodes[rand.Intn(len(nodes))] +} + +func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo { + for i := len(uploadNodes); i < extendTo; i++ { + uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))]) + } + + // 随机排列上传节点 + rand.Shuffle(len(uploadNodes), func(i, j int) { + uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i] + }) + + return uploadNodes +} + +func chooseRedundancy(obj *iterator.IterUploadingObject, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) (cdssdk.Redundancy, []UploadNodeInfo, error) { + // TODO 更好的算法 + // uploadNodes = shuffleNodes(uploadNodes, ecRed.N) + uploadNode := chooseUploadNode(userNodes, nodeAffinity) + return cdssdk.NewRepRedundancy(), []UploadNodeInfo{uploadNode}, nil +} + +func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - var uploadRets []ECObjectUploadResult + var uploadRets []ObjectUploadResult //上传文件夹 - var adds []coormq.AddECObjectInfo + var adds []coormq.AddObjectInfo for { objInfo, err := objectIter.MoveNext() if err == iterator.ErrNoMoreItem { @@ -158,8 +200,20 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje err = func() error { defer objInfo.File.Close() - fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec) - uploadRets = append(uploadRets, ECObjectUploadResult{ + red, uploadNodes, err := chooseRedundancy(objInfo, userNodes, nodeAffinity) + if err != nil { + return fmt.Errorf("choosing redundancy: %w", err) + } + + var addInfo *coormq.AddObjectInfo + switch r := red.(type) { + case *cdssdk.RepRedundancy: + addInfo, err = uploadRepObject(objInfo, uploadNodes, r) + case *cdssdk.ECRedundancy: + addInfo, err = uploadECObject(objInfo, uploadNodes, r) + } + + uploadRets = append(uploadRets, ObjectUploadResult{ Info: objInfo, Error: err, }) @@ -167,7 +221,7 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje return fmt.Errorf("uploading object: %w", err) } - adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs)) + adds = append(adds, *addInfo) return nil }() if err != nil { @@ -175,7 +229,7 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje } } - _, err = coorCli.UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil)) + _, err = coorCli.UpdateECPackage(coormq.NewUpdatePackage(packageID, adds, nil)) if err != nil { return nil, fmt.Errorf("updating package: %w", err) } @@ -183,16 +237,55 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje return uploadRets, nil } -// 上传文件 -func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ecMod model.Ec) ([]string, []int64, error) { - uploadNodes = shuffleNodes(uploadNodes, ecMod.EcN) +func uploadRepObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, repRed *cdssdk.RepRedundancy) (*coormq.AddObjectInfo, error) { + clonedStrs := myio.Clone(obj.File, len(uploadNodes)) + + fileHashes := make([]string, len(uploadNodes)) + anyErrs := make([]error, len(uploadNodes)) + wg := sync.WaitGroup{} + for i := range uploadNodes { + idx := i + wg.Add(1) + + go func() { + defer wg.Done() + fileHashes[idx], anyErrs[idx] = uploadFile(clonedStrs[idx], uploadNodes[idx]) + }() + } + + wg.Wait() + + var uploadedNodeIDs []cdssdk.NodeID + var fileHash string + var errs []error + for i, e := range anyErrs { + if e != nil { + errs[i] = e + continue + } + + uploadedNodeIDs = append(uploadedNodeIDs, uploadNodes[i].Node.NodeID) + fileHash = fileHashes[i] + } + + if len(uploadedNodeIDs) == 0 { + return nil, fmt.Errorf("uploading file: %w", errors.Join(errs...)) + } + + info := coormq.NewAddObjectInfo(obj.Path, obj.Size, repRed, + []stgmod.ObjectBlockDetail{ + stgmod.NewObjectBlockDetail(0, 0, fileHash, uploadedNodeIDs, uploadedNodeIDs), + }) + return &info, nil +} - rs, err := ec.NewRs(ecMod.EcK, ecMod.EcN, ecInfo.ChunkSize) +func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecRed *cdssdk.ECRedundancy) (*coormq.AddObjectInfo, error) { + rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) if err != nil { - return nil, nil, err + return nil, err } - outputs := myio.ChunkedSplit(obj.File, ecInfo.ChunkSize, ecMod.EcK, myio.ChunkedSplitOption{ + outputs := myio.ChunkedSplit(obj.File, ecRed.ChunkSize, ecRed.K, myio.ChunkedSplitOption{ PaddingZeros: true, }) var readers []io.Reader @@ -209,17 +302,18 @@ func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeI wg := sync.WaitGroup{} - nodeIDs := make([]int64, ecMod.EcN) - fileHashes := make([]string, ecMod.EcN) - anyErrs := make([]error, ecMod.EcN) + blocks := make([]stgmod.ObjectBlockDetail, ecRed.N) + anyErrs := make([]error, ecRed.N) for i := range encStrs { idx := i wg.Add(1) - nodeIDs[idx] = uploadNodes[idx].Node.NodeID + blocks[idx].Index = idx + blocks[idx].NodeIDs = []cdssdk.NodeID{uploadNodes[idx].Node.NodeID} + blocks[idx].CachedNodeIDs = []cdssdk.NodeID{uploadNodes[idx].Node.NodeID} go func() { defer wg.Done() - fileHashes[idx], anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx]) + blocks[idx].FileHash, anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx]) }() } @@ -227,22 +321,110 @@ func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeI for i, e := range anyErrs { if e != nil { - return nil, nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e) + return nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e) } } - return fileHashes, nodeIDs, nil + info := coormq.NewAddObjectInfo(obj.Path, obj.Size, ecRed, blocks) + return &info, nil } -func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo { - for i := len(uploadNodes); i < extendTo; i++ { - uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))]) +func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { + // 本地有IPFS,则直接从本地IPFS上传 + if stgglb.IPFSPool != nil { + logger.Infof("try to use local IPFS to upload file") + + // 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件 + fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, stgglb.Local.NodeID == nil) + if err == nil { + return fileHash, nil + + } else { + logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error()) + } } - // 随机排列上传节点 - rand.Shuffle(len(uploadNodes), func(i, j int) { - uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i] - }) + // 否则发送到agent上传 + // 如果客户端与节点在同一个地域,则使用内网地址连接节点 + nodeIP := uploadNode.Node.ExternalIP + grpcPort := uploadNode.Node.ExternalGRPCPort + if uploadNode.IsSameLocation { + nodeIP = uploadNode.Node.LocalIP + grpcPort = uploadNode.Node.LocalGRPCPort - return uploadNodes + logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID) + } + + fileHash, err := uploadToNode(file, nodeIP, grpcPort) + if err != nil { + return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) + } + + return fileHash, nil +} + +func uploadToNode(file io.Reader, nodeIP string, grpcPort int) (string, error) { + rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) + if err != nil { + return "", fmt.Errorf("new agent rpc client: %w", err) + } + defer rpcCli.Close() + + return rpcCli.SendIPFSFile(file) +} + +func uploadToLocalIPFS(file io.Reader, nodeID cdssdk.NodeID, shouldPin bool) (string, error) { + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return "", fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + // 从本地IPFS上传文件 + fileHash, err := ipfsCli.CreateFile(file) + if err != nil { + return "", fmt.Errorf("creating ipfs file: %w", err) + } + + if !shouldPin { + return fileHash, nil + } + + err = pinIPFSFile(nodeID, fileHash) + if err != nil { + return "", err + } + + return fileHash, nil +} + +func pinIPFSFile(nodeID cdssdk.NodeID, fileHash string) error { + agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) + if err != nil { + return fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agtCli) + + // 然后让最近节点pin本地上传的文件 + pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash)) + if err != nil { + return fmt.Errorf("start pinning object: %w", err) + } + + for { + waitResp, err := agtCli.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) + if err != nil { + return fmt.Errorf("waitting pinning object: %w", err) + } + + if waitResp.IsComplete { + if waitResp.Error != "" { + return fmt.Errorf("agent pinning object: %s", waitResp.Error) + } + + break + } + } + + return nil } diff --git a/common/pkgs/cmd/create_rep_package.go b/common/pkgs/cmd/create_rep_package.go deleted file mode 100644 index a2819f3..0000000 --- a/common/pkgs/cmd/create_rep_package.go +++ /dev/null @@ -1,304 +0,0 @@ -package cmd - -import ( - "fmt" - "io" - "math/rand" - "time" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" - agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -type UploadNodeInfo struct { - Node model.Node - IsSameLocation bool -} - -type CreateRepPackage struct { - userID int64 - bucketID int64 - name string - objectIter iterator.UploadingObjectIterator - redundancy cdssdk.RepRedundancyInfo - nodeAffinity *int64 -} - -type UpdatePackageContext struct { - Distlock *distlock.Service -} - -type CreateRepPackageResult struct { - PackageID int64 - ObjectResults []RepObjectUploadResult -} - -type RepObjectUploadResult struct { - Info *iterator.IterUploadingObject - Error error - FileHash string - ObjectID int64 -} - -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.RepRedundancyInfo, nodeAffinity *int64) *CreateRepPackage { - return &CreateRepPackage{ - userID: userID, - bucketID: bucketID, - name: name, - objectIter: objIter, - redundancy: redundancy, - nodeAffinity: nodeAffinity, - } -} - -func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackageResult, error) { - defer t.objectIter.Close() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - - reqBlder := reqbuilder.NewBuilder() - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if stgglb.Local.NodeID != nil { - reqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) - } - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有桶的权限 - UserBucket().ReadOne(t.userID, t.bucketID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于创建包信息 - Package().CreateOne(t.bucketID, t.name). - // 用于创建包中的文件的信息 - Object().CreateAny(). - // 用于设置EC配置 - ObjectBlock().CreateAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, - cdssdk.NewTypedRedundancyInfo(t.redundancy))) - if err != nil { - return nil, fmt.Errorf("creating package: %w", err) - } - - getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) - if err != nil { - return nil, fmt.Errorf("getting user nodes: %w", err) - } - - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) - if err != nil { - return nil, fmt.Errorf("finding client location: %w", err) - } - - nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { - return UploadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, - } - }) - uploadNode := t.chooseUploadNode(nodeInfos, t.nodeAffinity) - - // 防止上传的副本被清除 - ipfsMutex, err := reqbuilder.NewBuilder(). - IPFS().CreateAnyRep(uploadNode.Node.NodeID). - MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - defer ipfsMutex.Unlock() - - rets, err := uploadAndUpdateRepPackage(createPkgResp.PackageID, t.objectIter, uploadNode) - if err != nil { - return nil, err - } - - return &CreateRepPackageResult{ - PackageID: createPkgResp.PackageID, - ObjectResults: rets, - }, nil -} - -func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNode UploadNodeInfo) ([]RepObjectUploadResult, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - - var uploadRets []RepObjectUploadResult - var adds []coormq.AddRepObjectInfo - for { - objInfo, err := objectIter.MoveNext() - if err == iterator.ErrNoMoreItem { - break - } - if err != nil { - return nil, fmt.Errorf("reading object: %w", err) - } - - err = func() error { - defer objInfo.File.Close() - fileHash, err := uploadFile(objInfo.File, uploadNode) - uploadRets = append(uploadRets, RepObjectUploadResult{ - Info: objInfo, - Error: err, - FileHash: fileHash, - }) - if err != nil { - return fmt.Errorf("uploading object: %w", err) - } - - adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID})) - return nil - }() - if err != nil { - return nil, err - } - } - - _, err = coorCli.UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil)) - if err != nil { - return nil, fmt.Errorf("updating package: %w", err) - } - - return uploadRets, nil -} - -// 上传文件 -func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { - // 本地有IPFS,则直接从本地IPFS上传 - if stgglb.IPFSPool != nil { - logger.Infof("try to use local IPFS to upload file") - - // 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件 - fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, stgglb.Local.NodeID == nil) - if err == nil { - return fileHash, nil - - } else { - logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error()) - } - } - - // 否则发送到agent上传 - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := uploadNode.Node.ExternalIP - grpcPort := uploadNode.Node.ExternalGRPCPort - if uploadNode.IsSameLocation { - nodeIP = uploadNode.Node.LocalIP - grpcPort = uploadNode.Node.LocalGRPCPort - - logger.Infof("client and node %d are at the same location, use local ip", uploadNode.Node.NodeID) - } - - fileHash, err := uploadToNode(file, nodeIP, grpcPort) - if err != nil { - return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) - } - - return fileHash, nil -} - -// chooseUploadNode 选择一个上传文件的节点 -// 1. 选择设置了亲和性的节点 -// 2. 从与当前客户端相同地域的节点中随机选一个 -// 3. 没有用的话从所有节点中随机选一个 -func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *int64) UploadNodeInfo { - if nodeAffinity != nil { - aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity }) - if ok { - return aff - } - } - - sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationNodes) > 0 { - return sameLocationNodes[rand.Intn(len(sameLocationNodes))] - } - - return nodes[rand.Intn(len(nodes))] -} - -func uploadToNode(file io.Reader, nodeIP string, grpcPort int) (string, error) { - rpcCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) - if err != nil { - return "", fmt.Errorf("new agent rpc client: %w", err) - } - defer rpcCli.Close() - - return rpcCli.SendIPFSFile(file) -} - -func uploadToLocalIPFS(file io.Reader, nodeID int64, shouldPin bool) (string, error) { - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return "", fmt.Errorf("new ipfs client: %w", err) - } - defer ipfsCli.Close() - - // 从本地IPFS上传文件 - fileHash, err := ipfsCli.CreateFile(file) - if err != nil { - return "", fmt.Errorf("creating ipfs file: %w", err) - } - - if !shouldPin { - return fileHash, nil - } - - err = pinIPFSFile(nodeID, fileHash) - if err != nil { - return "", err - } - - return fileHash, nil -} - -func pinIPFSFile(nodeID int64, fileHash string) error { - agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) - if err != nil { - return fmt.Errorf("new agent client: %w", err) - } - defer stgglb.AgentMQPool.Release(agtCli) - - // 然后让最近节点pin本地上传的文件 - pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash)) - if err != nil { - return fmt.Errorf("start pinning object: %w", err) - } - - for { - waitResp, err := agtCli.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) - if err != nil { - return fmt.Errorf("waitting pinning object: %w", err) - } - - if waitResp.IsComplete { - if waitResp.Error != "" { - return fmt.Errorf("agent pinning object: %s", waitResp.Error) - } - - break - } - } - - return nil -} diff --git a/common/pkgs/cmd/download_package.go b/common/pkgs/cmd/download_package.go index cf031d0..1abcba5 100644 --- a/common/pkgs/cmd/download_package.go +++ b/common/pkgs/cmd/download_package.go @@ -10,14 +10,13 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) type DownloadPackage struct { - userID int64 - packageID int64 + userID cdssdk.UserID + packageID cdssdk.PackageID outputPath string } @@ -25,7 +24,7 @@ type DownloadPackageContext struct { Distlock *distlock.Service } -func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { +func NewDownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *DownloadPackage { return &DownloadPackage{ userID: userID, packageID: packageID, @@ -40,85 +39,20 @@ func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error { } defer stgglb.CoordinatorMQPool.Release(coorCli) - getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) + getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID)) if err != nil { - - return fmt.Errorf("getting package: %w", err) - } - - var objIter iterator.DownloadingObjectIterator - if getPkgResp.Redundancy.IsRepInfo() { - objIter, err = t.downloadRep(ctx) - } else { - objIter, err = t.downloadEC(ctx, getPkgResp.Package) + return fmt.Errorf("getting package object details: %w", err) } - if err != nil { - return err - } - defer objIter.Close() - return t.writeObject(objIter) -} - -func (t *DownloadPackage) downloadRep(ctx *DownloadPackageContext) (iterator.DownloadingObjectIterator, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) - if err != nil { - return nil, fmt.Errorf("getting package objects: %w", err) - } - - getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(t.packageID)) - if err != nil { - return nil, fmt.Errorf("getting package object rep data: %w", err) - } - - iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, &iterator.DownloadContext{ - Distlock: ctx.Distlock, - }) - - return iter, nil -} - -func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Package) (iterator.DownloadingObjectIterator, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) - if err != nil { - return nil, fmt.Errorf("getting package objects: %w", err) - } - - getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(t.packageID)) - if err != nil { - return nil, fmt.Errorf("getting package object ec data: %w", err) - } - - var ecInfo cdssdk.ECRedundancyInfo - if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil { - return nil, fmt.Errorf("get ec redundancy info: %w", err) - } - - getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName)) - if err != nil { - return nil, fmt.Errorf("getting ec: %w", err) - } - - iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, ecInfo, getECResp.Config, &iterator.DownloadContext{ + objIter := iterator.NewObjectIterator(getObjectDetails.Objects, &iterator.DownloadContext{ Distlock: ctx.Distlock, }) + defer objIter.Close() - return iter, nil + return t.writeObjects(objIter) } -func (t *DownloadPackage) writeObject(objIter iterator.DownloadingObjectIterator) error { +func (t *DownloadPackage) writeObjects(objIter iterator.DownloadingObjectIterator) error { for { objInfo, err := objIter.MoveNext() if err == iterator.ErrNoMoreItem { diff --git a/common/pkgs/cmd/update_ec_package.go b/common/pkgs/cmd/update_ec_package.go index a196569..675fcb0 100644 --- a/common/pkgs/cmd/update_ec_package.go +++ b/common/pkgs/cmd/update_ec_package.go @@ -14,25 +14,30 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type UpdateECPackage struct { - userID int64 - packageID int64 +type UpdatePackage struct { + userID cdssdk.UserID + packageID cdssdk.PackageID objectIter iterator.UploadingObjectIterator } -type UpdateECPackageResult struct { - ObjectResults []ECObjectUploadResult +type UpdatePackageResult struct { + ObjectResults []ObjectUploadResult } -func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) *UpdateECPackage { - return &UpdateECPackage{ +type UpdateNodeInfo struct { + UploadNodeInfo + HasOldObject bool +} + +func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) *UpdatePackage { + return &UpdatePackage{ userID: userID, packageID: packageID, objectIter: objIter, } } -func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageResult, error) { +func (t *UpdatePackage) Execute(ctx *UpdatePackageContext) (*UpdatePackageResult, error) { defer t.objectIter.Close() coorCli, err := stgglb.CoordinatorMQPool.Acquire() @@ -58,45 +63,25 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe } defer mutex.Unlock() - getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) - if err != nil { - return nil, fmt.Errorf("getting package: %w", err) - } - getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) - if err != nil { - return nil, fmt.Errorf("finding client location: %w", err) - } - - nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { + userNodes := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { return UploadNodeInfo{ Node: node, - IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, + IsSameLocation: node.LocationID == stgglb.Local.LocationID, } }) - var ecInfo cdssdk.ECRedundancyInfo - if ecInfo, err = getPkgResp.Package.Redundancy.ToECInfo(); err != nil { - return nil, fmt.Errorf("get ec redundancy info: %w", err) - } - - getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName)) - if err != nil { - return nil, fmt.Errorf("getting ec: %w", err) - } - // 给上传节点的IPFS加锁 ipfsReqBlder := reqbuilder.NewBuilder() // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 if stgglb.Local.NodeID != nil { ipfsReqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) } - for _, node := range nodeInfos { + for _, node := range userNodes { if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID { continue } @@ -110,12 +95,12 @@ func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageRe } defer ipfsMutex.Unlock() - rets, err := uploadAndUpdateECPackage(t.packageID, t.objectIter, nodeInfos, ecInfo, getECResp.Config) + rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, nil) if err != nil { return nil, err } - return &UpdateECPackageResult{ + return &UpdatePackageResult{ ObjectResults: rets, }, nil } diff --git a/common/pkgs/cmd/update_rep_package.go b/common/pkgs/cmd/update_rep_package.go deleted file mode 100644 index a42530b..0000000 --- a/common/pkgs/cmd/update_rep_package.go +++ /dev/null @@ -1,128 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/samber/lo" - mysort "gitlink.org.cn/cloudream/common/utils/sort" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -type UpdateRepPackage struct { - userID int64 - packageID int64 - objectIter iterator.UploadingObjectIterator -} - -type UpdateNodeInfo struct { - UploadNodeInfo - HasOldObject bool -} - -type UpdateRepPackageResult struct { - ObjectResults []RepObjectUploadResult -} - -func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage { - return &UpdateRepPackage{ - userID: userID, - packageID: packageID, - objectIter: objectIter, - } -} - -func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackageResult, error) { - defer t.objectIter.Close() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - - reqBlder := reqbuilder.NewBuilder() - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if stgglb.Local.NodeID != nil { - reqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID) - } - mutex, err := reqBlder. - Metadata(). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于创建包信息 - Package().WriteOne(t.packageID). - // 用于创建包中的文件的信息 - Object().CreateAny(). - // 用于设置EC配置 - ObjectBlock().CreateAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) - if err != nil { - return nil, fmt.Errorf("getting user nodes: %w", err) - } - - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) - if err != nil { - return nil, fmt.Errorf("finding client location: %w", err) - } - - nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UpdateNodeInfo { - return UpdateNodeInfo{ - UploadNodeInfo: UploadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID, - }, - } - }) - // 上传文件的方式优先级: - // 1. 本地IPFS - // 2. 包含了旧文件,且与客户端在同地域的节点 - // 3. 不在同地域,但包含了旧文件的节点 - // 4. 同地域节点 - // TODO 需要考虑在多文件的情况下的规则 - uploadNode := t.chooseUploadNode(nodeInfos) - - // 防止上传的副本被清除 - ipfsMutex, err := reqbuilder.NewBuilder(). - IPFS().CreateAnyRep(uploadNode.Node.NodeID). - MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - defer ipfsMutex.Unlock() - - rets, err := uploadAndUpdateRepPackage(t.packageID, t.objectIter, uploadNode.UploadNodeInfo) - if err != nil { - return nil, err - } - - return &UpdateRepPackageResult{ - ObjectResults: rets, - }, nil -} - -// chooseUploadNode 选择一个上传文件的节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (t *UpdateRepPackage) chooseUploadNode(nodes []UpdateNodeInfo) UpdateNodeInfo { - mysort.Sort(nodes, func(left, right UpdateNodeInfo) int { - v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject) - if v != 0 { - return v - } - - return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation) - }) - - return nodes[0] -} diff --git a/common/pkgs/db/ec.go b/common/pkgs/db/ec.go deleted file mode 100644 index ba486e9..0000000 --- a/common/pkgs/db/ec.go +++ /dev/null @@ -1,30 +0,0 @@ -package db - -import ( - //"database/sql" - - "github.com/jmoiron/sqlx" - //"gitlink.org.cn/cloudream/common/consts" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" -) - -type EcDB struct { - *DB -} - -func (db *DB) Ec() *EcDB { - return &EcDB{DB: db} -} - -// GetEc 查询纠删码参数 -func (db *EcDB) GetEc(ctx SQLContext, ecName string) (model.Ec, error) { - var ret model.Ec - err := sqlx.Get(ctx, &ret, "select * from Ec where Name = ?", ecName) - return ret, err -} - -func (db *EcDB) GetEcName(ctx SQLContext, objectID int) (string, error) { - var ret string - err := sqlx.Get(ctx, &ret, "select Redundancy from Object where ObjectID = ?") - return ret, err -} diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 7bf9cd5..bb0c13e 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -4,28 +4,29 @@ import ( "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" ) // TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里 type Node struct { - NodeID int64 `db:"NodeID" json:"nodeID"` - Name string `db:"Name" json:"name"` - LocalIP string `db:"LocalIP" json:"localIP"` - ExternalIP string `db:"ExternalIP" json:"externalIP"` - LocalGRPCPort int `db:"LocalGRPCPort" json:"localGRPCPort"` - ExternalGRPCPort int `db:"ExternalGRPCPort" json:"externalGRPCPort"` - LocationID int64 `db:"LocationID" json:"locationID"` - State string `db:"State" json:"state"` - LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` + Name string `db:"Name" json:"name"` + LocalIP string `db:"LocalIP" json:"localIP"` + ExternalIP string `db:"ExternalIP" json:"externalIP"` + LocalGRPCPort int `db:"LocalGRPCPort" json:"localGRPCPort"` + ExternalGRPCPort int `db:"ExternalGRPCPort" json:"externalGRPCPort"` + LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"` + State string `db:"State" json:"state"` + LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"` } type Storage struct { - StorageID int64 `db:"StorageID" json:"storageID"` - Name string `db:"Name" json:"name"` - NodeID int64 `db:"NodeID" json:"nodeID"` - Directory string `db:"Directory" json:"directory"` - State string `db:"State" json:"state"` + StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` + Name string `db:"Name" json:"name"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` + Directory string `db:"Directory" json:"directory"` + State string `db:"State" json:"state"` } type NodeDelay struct { @@ -35,69 +36,67 @@ type NodeDelay struct { } type User struct { - UserID int64 `db:"UserID" json:"userID"` - Password string `db:"PassWord" json:"password"` + UserID cdssdk.UserID `db:"UserID" json:"userID"` + Password string `db:"PassWord" json:"password"` } type UserBucket struct { - UserID int64 `db:"UserID" json:"userID"` - BucketID int64 `db:"BucketID" json:"bucketID"` + UserID cdssdk.UserID `db:"UserID" json:"userID"` + BucketID cdssdk.BucketID `db:"BucketID" json:"bucketID"` } type UserNode struct { - UserID int64 `db:"UserID" json:"userID"` - NodeID int64 `db:"NodeID" json:"nodeID"` + UserID cdssdk.UserID `db:"UserID" json:"userID"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` } type UserStorage struct { - UserID int64 `db:"UserID" json:"userID"` - StorageID int64 `db:"StorageID" json:"storageID"` + UserID cdssdk.UserID `db:"UserID" json:"userID"` + StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` } type Bucket struct { - BucketID int64 `db:"BucketID" json:"bucketID"` - Name string `db:"Name" json:"name"` - CreatorID int64 `db:"CreatorID" json:"creatorID"` + BucketID cdssdk.BucketID `db:"BucketID" json:"bucketID"` + Name string `db:"Name" json:"name"` + CreatorID cdssdk.UserID `db:"CreatorID" json:"creatorID"` } type Package = cdssdk.Package type Object = cdssdk.Object -type ObjectRep struct { - ObjectID int64 `db:"ObjectID" json:"objectID"` - FileHash string `db:"FileHash" json:"fileHash"` -} - -type ObjectBlock struct { - ObjectID int64 `db:"ObjectID" json:"objectID"` - Index int `db:"Index" json:"index"` - FileHash string `db:"FileHash" json:"fileHash"` -} +type ObjectBlock = stgmod.ObjectBlock type Cache struct { - FileHash string `db:"FileHash" json:"fileHash"` - NodeID int64 `db:"NodeID" json:"nodeID"` - State string `db:"State" json:"state"` - CacheTime time.Time `db:"CacheTime" json:"cacheTime"` - Priority int `db:"Priority" json:"priority"` + FileHash string `db:"FileHash" json:"fileHash"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` + State string `db:"State" json:"state"` + CacheTime time.Time `db:"CacheTime" json:"cacheTime"` + Priority int `db:"Priority" json:"priority"` } +const ( + StoragePackageStateNormal = "Normal" + StoragePackageStateDeleted = "Deleted" + StoragePackageStateOutdated = "Outdated" +) + +// Storage当前加载的Package type StoragePackage struct { - PackageID int64 `db:"PackageID" json:"packageID"` - StorageID int64 `db:"StorageID" json:"storageID"` - UserID int64 `db:"UserID" json:"userID"` - State string `db:"State" json:"state"` + StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` + PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"` + UserID cdssdk.UserID `db:"UserID" json:"userID"` + State string `db:"State" json:"state"` } -type Location struct { - LocationID int64 `db:"LocationID" json:"locationID"` - Name string `db:"Name" json:"name"` +type StoragePackageLog struct { + StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` + PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"` + UserID cdssdk.UserID `db:"UserID" json:"userID"` + CreateTime time.Time `db:"CreateTime" json:"createTime"` } -type Ec struct { - EcID int `db:"EcID" json:"ecID"` - Name string `db:"Name" json:"name"` - EcK int `db:"EcK" json:"ecK"` - EcN int `db:"EcN" json:"ecN"` +type Location struct { + LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"` + Name string `db:"Name" json:"name"` } diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index e027ae3..8c84632 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" - "github.com/samber/lo" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -17,15 +17,16 @@ func (db *DB) Object() *ObjectDB { return &ObjectDB{DB: db} } -func (db *ObjectDB) GetByID(ctx SQLContext, objectID int64) (model.Object, error) { +func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) { var ret model.Object err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID) return ret, err } -func (db *ObjectDB) Create(ctx SQLContext, packageID int64, path string, size int64) (int64, error) { - sql := "insert into Object(PackageID, Path, Size) values(?,?,?)" - ret, err := ctx.Exec(sql, packageID, path, size) +func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, redundancy cdssdk.Redundancy) (int64, error) { + sql := "insert into Object(PackageID, Path, Size, Redundancy) values(?,?,?,?)" + + ret, err := ctx.Exec(sql, packageID, path, size, redundancy) if err != nil { return 0, fmt.Errorf("insert object failed, err: %w", err) } @@ -39,9 +40,10 @@ func (db *ObjectDB) Create(ctx SQLContext, packageID int64, path string, size in } // 创建或者更新记录,返回值true代表是创建,false代表是更新 -func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID int64, path string, size int64) (int64, bool, error) { - sql := "insert into Object(PackageID, Path, Size) values(?,?,?) on duplicate key update Size = ?" - ret, err := ctx.Exec(sql, packageID, path, size, size) +func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, redundancy cdssdk.Redundancy) (cdssdk.ObjectID, bool, error) { + sql := "insert into Object(PackageID, Path, Size, Redundancy) values(?,?,?,?) on duplicate key update Size = ?, Redundancy = ?" + + ret, err := ctx.Exec(sql, packageID, path, size, redundancy, size, redundancy) if err != nil { return 0, false, fmt.Errorf("insert object failed, err: %w", err) } @@ -57,10 +59,10 @@ func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID int64, path string, if err != nil { return 0, false, fmt.Errorf("get id of inserted object failed, err: %w", err) } - return objectID, true, nil + return cdssdk.ObjectID(objectID), true, nil } - var objID int64 + var objID cdssdk.ObjectID if err = sqlx.Get(ctx, &objID, "select ObjectID from Object where PackageID = ? and Path = ?", packageID, path); err != nil { return 0, false, fmt.Errorf("getting object id: %w", err) } @@ -68,65 +70,7 @@ func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID int64, path string, return objID, false, nil } -func (db *ObjectDB) UpdateRepObject(ctx SQLContext, objectID int64, fileSize int64, nodeIDs []int64, fileHash string) error { - _, err := db.UpdateFileInfo(ctx, objectID, fileSize) - if err != nil { - if err != nil { - return fmt.Errorf("update rep object failed, err: %w", err) - } - } - - objRep, err := db.ObjectRep().GetByID(ctx, objectID) - if err != nil { - return fmt.Errorf("get object rep failed, err: %w", err) - } - - // 如果新文件与旧文件的Hash不同,则需要更新关联的FileHash,重新插入Cache记录 - if objRep.FileHash != fileHash { - _, err := db.ObjectRep().Update(ctx, objectID, fileHash) - if err != nil { - return fmt.Errorf("update rep object file hash failed, err: %w", err) - } - - for _, nodeID := range nodeIDs { - err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority = 0 - if err != nil { - return fmt.Errorf("create cache failed, err: %w", err) - } - } - - } else { - // 如果相同,则只增加Cache中不存在的记录 - cachedNodes, err := db.Cache().GetCachingFileNodes(ctx, fileHash) - if err != nil { - return fmt.Errorf("find caching file nodes failed, err: %w", err) - } - - // 筛选出不在cachedNodes中的id - newNodeIDs := lo.Filter(nodeIDs, func(id int64, index int) bool { - return lo.NoneBy(cachedNodes, func(node model.Node) bool { - return node.NodeID == id - }) - }) - for _, nodeID := range newNodeIDs { - err := db.Cache().CreatePinned(ctx, fileHash, nodeID, 0) //priority - if err != nil { - return fmt.Errorf("create cache failed, err: %w", err) - } - } - } - - return nil -} - -func (*ObjectDB) BatchGetAllEcObjectIDs(ctx SQLContext, start int, count int) ([]int64, error) { - var ret []int64 - rep := "rep" - err := sqlx.Select(ctx, &ret, "SELECT ObjectID FROM object where Redundancy != ? limit ?, ?", rep, start, count) - return ret, err -} - -func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID int64, fileSize int64) (bool, error) { +func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) { ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID) if err != nil { return false, err @@ -140,100 +84,17 @@ func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID int64, fileSize int64) return cnt > 0, nil } -func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID int64) ([]model.Object, error) { +func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { var ret []model.Object err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID) return ret, err } -func (db *ObjectDB) BatchAddRep(ctx SQLContext, packageID int64, objs []coormq.AddRepObjectInfo) ([]int64, error) { - var objIDs []int64 - for _, obj := range objs { - // 创建对象的记录 - objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size) - if err != nil { - return nil, fmt.Errorf("creating object: %w", err) - } - - objIDs = append(objIDs, objID) - - if isCreate { - if err := db.createRep(ctx, objID, obj); err != nil { - return nil, err - } - } else { - if err := db.updateRep(ctx, objID, obj); err != nil { - return nil, err - } - } - } - - return objIDs, nil -} - -func (db *ObjectDB) createRep(ctx SQLContext, objID int64, obj coormq.AddRepObjectInfo) error { - // 创建对象副本的记录 - if err := db.ObjectRep().Create(ctx, objID, obj.FileHash); err != nil { - return fmt.Errorf("creating object rep: %w", err) - } - - // 创建缓存记录 - priority := 0 //优先级暂时设置为0 - for _, nodeID := range obj.NodeIDs { - if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, priority); err != nil { - return fmt.Errorf("creating cache: %w", err) - } - } - - return nil -} -func (db *ObjectDB) updateRep(ctx SQLContext, objID int64, obj coormq.AddRepObjectInfo) error { - objRep, err := db.ObjectRep().GetByID(ctx, objID) - if err != nil { - return fmt.Errorf("getting object rep: %w", err) - } - - // 如果新文件与旧文件的Hash不同,则需要更新关联的FileHash,重新插入Cache记录 - if objRep.FileHash != obj.FileHash { - _, err := db.ObjectRep().Update(ctx, objID, obj.FileHash) - if err != nil { - return fmt.Errorf("updating rep object file hash: %w", err) - } - - for _, nodeID := range obj.NodeIDs { - if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, 0); err != nil { - return fmt.Errorf("creating cache: %w", err) - } - } - - } else { - // 如果相同,则只增加Cache中不存在的记录 - cachedNodes, err := db.Cache().GetCachingFileNodes(ctx, obj.FileHash) - if err != nil { - return fmt.Errorf("finding caching file nodes: %w", err) - } - - // 筛选出不在cachedNodes中的id - newNodeIDs := lo.Filter(obj.NodeIDs, func(id int64, index int) bool { - return lo.NoneBy(cachedNodes, func(node model.Node) bool { - return node.NodeID == id - }) - }) - for _, nodeID := range newNodeIDs { - if err := db.Cache().CreatePinned(ctx, obj.FileHash, nodeID, 0); err != nil { - return fmt.Errorf("creating cache: %w", err) - } - } - } - - return nil -} - -func (db *ObjectDB) BatchAddEC(ctx SQLContext, packageID int64, objs []coormq.AddECObjectInfo) ([]int64, error) { - objIDs := make([]int64, 0, len(objs)) +func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectInfo) ([]cdssdk.ObjectID, error) { + objIDs := make([]cdssdk.ObjectID, 0, len(objs)) for _, obj := range objs { // 创建对象的记录 - objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size) + objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size, obj.Redundancy) if err != nil { return nil, fmt.Errorf("creating object: %w", err) } @@ -245,23 +106,24 @@ func (db *ObjectDB) BatchAddEC(ctx SQLContext, packageID int64, objs []coormq.Ad if err = db.ObjectBlock().DeleteObjectAll(ctx, objID); err != nil { return nil, fmt.Errorf("deleting all object block: %w", err) } - } // 创建编码块的记录 - for i := 0; i < len(obj.FileHashes); i++ { - err := db.ObjectBlock().Create(ctx, objID, i, obj.FileHashes[i]) - if err != nil { - return nil, fmt.Errorf("creating object block: %w", err) + for _, b := range obj.Blocks { + for _, n := range b.NodeIDs { + err := db.ObjectBlock().Create(ctx, objID, b.Index, b.FileHash, n) + if err != nil { + return nil, fmt.Errorf("creating object block: %w", err) + } } - } - // 创建缓存记录 - priority := 0 //优先级暂时设置为0 - for i, nodeID := range obj.NodeIDs { - err = db.Cache().CreatePinned(ctx, obj.FileHashes[i], nodeID, priority) - if err != nil { - return nil, fmt.Errorf("creating cache: %w", err) + // 创建缓存记录 + priority := 0 //优先级暂时设置为0 + for _, nodeID := range b.CachedNodeIDs { + err = db.Cache().CreatePinned(ctx, b.FileHash, nodeID, priority) + if err != nil { + return nil, fmt.Errorf("creating cache: %w", err) + } } } } @@ -269,12 +131,12 @@ func (db *ObjectDB) BatchAddEC(ctx SQLContext, packageID int64, objs []coormq.Ad return objIDs, nil } -func (*ObjectDB) BatchDelete(ctx SQLContext, ids []int64) error { +func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { _, err := ctx.Exec("delete from Object where ObjectID in (?)", ids) return err } -func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID int64) error { +func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { _, err := ctx.Exec("delete from Object where PackageID = ?", packageID) return err } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index dd0ab58..f1a6f7d 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -8,7 +8,6 @@ import ( "github.com/jmoiron/sqlx" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/consts" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -42,7 +41,7 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in "select count(FileHash) from ObjectBlock, Object, Package where FileHash = ? and"+ " ObjectBlock.ObjectID = Object.ObjectID and"+ " Object.PackageID = Package.PackageID and"+ - " Package.State = ?", fileHash, consts.PackageStateNormal) + " Package.State = ?", fileHash, cdssdk.PackageStateNormal) if err == sql.ErrNoRows { return 0, nil } @@ -50,47 +49,49 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in return cnt, err } -func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectECData, error) { +func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) { var objs []model.Object err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) if err != nil { return nil, fmt.Errorf("query objectIDs: %w", err) } - rets := make([]stgmod.ObjectECData, 0, len(objs)) + rets := make([]stgmod.ObjectDetail, 0, len(objs)) for _, obj := range objs { var tmpRets []struct { - Index int `db:"Index"` - FileHash string `db:"FileHash"` - NodeIDs *string `db:"NodeIDs"` + Index int `db:"Index"` + FileHashes string `db:"FileHashes"` + NodeIDs string `db:"NodeIDs"` + CachedNodeIDs *string `db:"CachedNodeIDs"` } err := sqlx.Select(ctx, &tmpRets, - "select ObjectBlock.Index, ObjectBlock.FileHash, group_concat(NodeID) as NodeIDs from ObjectBlock"+ - " left join Cache on ObjectBlock.FileHash = Cache.FileHash"+ - " where ObjectID = ? group by ObjectBlock.Index, ObjectBlock.FileHash", + "select ObjectBlock.Index, group_concat(distinct ObjectBlock.FileHash) as FileHashes, group_concat(distinct ObjectBlock.NodeID) as NodeIDs, group_concat(distinct Cache.NodeID) as CachedNodeIDs"+ + " from ObjectBlock left join Cache on ObjectBlock.FileHash = Cache.FileHash"+ + " where ObjectID = ? group by ObjectBlock.Index", obj.ObjectID, ) if err != nil { return nil, err } - blocks := make([]stgmod.ObjectBlockData, 0, len(tmpRets)) + blocks := make([]stgmod.ObjectBlockDetail, 0, len(tmpRets)) for _, tmp := range tmpRets { - var block stgmod.ObjectBlockData + var block stgmod.ObjectBlockDetail block.Index = tmp.Index - block.FileHash = tmp.FileHash - if tmp.NodeIDs != nil { - block.CachedNodeIDs = splitIDStringUnsafe(*tmp.NodeIDs) + block.FileHash = splitConcatedToString(tmp.FileHashes)[0] + block.NodeIDs = splitConcatedToNodeID(tmp.NodeIDs) + if tmp.CachedNodeIDs != nil { + block.CachedNodeIDs = splitConcatedToNodeID(*tmp.CachedNodeIDs) } blocks = append(blocks, block) } - rets = append(rets, stgmod.NewObjectECData(obj, blocks)) + rets = append(rets, stgmod.NewObjectDetail(obj, blocks)) } return rets, nil @@ -98,7 +99,7 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID cdssdk // 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 // 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 -func splitIDStringUnsafe(idStr string) []cdssdk.NodeID { +func splitConcatedToNodeID(idStr string) []cdssdk.NodeID { idStrs := strings.Split(idStr, ",") ids := make([]cdssdk.NodeID, 0, len(idStrs)) @@ -110,3 +111,9 @@ func splitIDStringUnsafe(idStr string) []cdssdk.NodeID { return ids } + +// 按逗号切割字符串 +func splitConcatedToString(idStr string) []string { + idStrs := strings.Split(idStr, ",") + return idStrs +} diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index 58c6b28..9ef4baf 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -8,7 +8,6 @@ import ( "github.com/jmoiron/sqlx" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -92,7 +91,7 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin } sql := "insert into Package(Name, BucketID, State) values(?,?,?)" - r, err := ctx.Exec(sql, name, bucketID, consts.PackageStateNormal) + r, err := ctx.Exec(sql, name, bucketID, cdssdk.PackageStateNormal) if err != nil { return 0, fmt.Errorf("insert package failed, err: %w", err) } @@ -114,11 +113,11 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID cdssdk.PackageID) erro // 不是正常状态的Package,则不删除 // TODO 未来可能有其他状态 - if obj.State != consts.PackageStateNormal { + if obj.State != cdssdk.PackageStateNormal { return nil } - err = db.ChangeState(ctx, packageID, consts.PackageStateDeleted) + err = db.ChangeState(ctx, packageID, cdssdk.PackageStateDeleted) if err != nil { return fmt.Errorf("change package state failed, err: %w", err) } @@ -145,7 +144,7 @@ func (PackageDB) DeleteUnused(ctx SQLContext, packageID cdssdk.PackageID) error _, err := ctx.Exec("delete from Package where PackageID = ? and State = ? and "+ "not exists(select StorageID from StoragePackage where PackageID = ?)", packageID, - consts.PackageStateDeleted, + cdssdk.PackageStateDeleted, packageID, ) diff --git a/common/pkgs/db/storage_package.go b/common/pkgs/db/storage_package.go index f201634..6597132 100644 --- a/common/pkgs/db/storage_package.go +++ b/common/pkgs/db/storage_package.go @@ -5,7 +5,6 @@ import ( "github.com/jmoiron/sqlx" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -36,7 +35,7 @@ func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID cdssdk.Stor } func (*StoragePackageDB) LoadPackage(ctx SQLContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID, userID cdssdk.UserID) error { - _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, consts.StoragePackageStateNormal) + _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, model.StoragePackageStateNormal) return err } @@ -48,11 +47,11 @@ func (*StoragePackageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID, // SetStateNormal 将状态设置为Normal,如果记录状态是Deleted,则不进行操作 func (*StoragePackageDB) SetStateNormal(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { _, err := ctx.Exec("update StoragePackage set State = ? where StorageID = ? and PackageID = ? and UserID = ? and State <> ?", - consts.StoragePackageStateNormal, + model.StoragePackageStateNormal, storageID, packageID, userID, - consts.StoragePackageStateDeleted, + model.StoragePackageStateDeleted, ) return err } @@ -80,8 +79,8 @@ func (*StoragePackageDB) SetAllPackageState(ctx SQLContext, packageID cdssdk.Pac func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) { ret, err := ctx.Exec( "update StoragePackage set State = ? where State = ? and PackageID = ?", - consts.StoragePackageStateOutdated, - consts.StoragePackageStateNormal, + model.StoragePackageStateOutdated, + model.StoragePackageStateNormal, packageID, ) if err != nil { @@ -97,7 +96,7 @@ func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID cdssdk. } func (db *StoragePackageDB) SetAllPackageDeleted(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) { - return db.SetAllPackageState(ctx, packageID, consts.StoragePackageStateDeleted) + return db.SetAllPackageState(ctx, packageID, model.StoragePackageStateDeleted) } func (*StoragePackageDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { diff --git a/common/pkgs/db/storage_package_log.go b/common/pkgs/db/storage_package_log.go new file mode 100644 index 0000000..bd0c532 --- /dev/null +++ b/common/pkgs/db/storage_package_log.go @@ -0,0 +1,34 @@ +package db + +import ( + "time" + + "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" +) + +type StoragePackageLogDB struct { + *DB +} + +func (db *DB) StoragePackageLog() *StoragePackageLogDB { + return &StoragePackageLogDB{DB: db} +} + +func (*StoragePackageLogDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackage, error) { + var ret model.StoragePackage + err := sqlx.Get(ctx, &ret, "select * from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) + return ret, err +} + +func (*StoragePackageLogDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, createTime time.Time) (model.StoragePackage, error) { + var ret model.StoragePackage + err := sqlx.Get(ctx, &ret, "insert into StoragePackageLog values(?,?,?,?)", storageID, packageID, userID, createTime) + return ret, err +} + +func (*StoragePackageLogDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { + _, err := ctx.Exec("delete from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) + return err +} diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/ec_object_iterator.go index 97b9ae1..9cbafc2 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/ec_object_iterator.go @@ -4,62 +4,62 @@ import ( "fmt" "io" "math/rand" + "reflect" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type ECObjectIterator struct { +type DownloadingObjectIterator = Iterator[*IterDownloadingObject] + +type IterDownloadingObject struct { + Object model.Object + File io.ReadCloser +} + +type DownloadNodeInfo struct { + Node model.Node + IsSameLocation bool +} + +type DownloadContext struct { + Distlock *distlock.Service +} +type ObjectIterator struct { OnClosing func() - objects []model.Object - objectECData []stgmodels.ObjectECData - currentIndex int - inited bool + objectDetails []stgmodels.ObjectDetail + currentIndex int - ecInfo cdssdk.ECRedundancyInfo - ec model.Ec downloadCtx *DownloadContext - cliLocation model.Location } -func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator { - return &ECObjectIterator{ - objects: objects, - objectECData: objectECData, - ecInfo: ecInfo, - ec: ec, - downloadCtx: downloadCtx, +func NewObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *ObjectIterator { + return &ObjectIterator{ + objectDetails: objectDetails, + downloadCtx: downloadCtx, } } -func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { - // TODO 加锁 +func (i *ObjectIterator) MoveNext() (*IterDownloadingObject, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) - if !i.inited { - i.inited = true - - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) - if err != nil { - return nil, fmt.Errorf("finding client location: %w", err) - } - i.cliLocation = findCliLocResp.Location - } - - if i.currentIndex >= len(i.objects) { + if i.currentIndex >= len(i.objectDetails) { return nil, ErrNoMoreItem } @@ -68,25 +68,63 @@ func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { return item, err } -func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { - obj := iter.objects[iter.currentIndex] - ecData := iter.objectECData[iter.currentIndex] +func (iter *ObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { + obj := iter.objectDetails[iter.currentIndex] - //采取直接读,优先选内网节点 - var chosenNodes []DownloadNodeInfo - var chosenBlocks []stgmodels.ObjectBlockData - for i := range ecData.Blocks { - if len(chosenBlocks) == iter.ec.EcK { - break + switch red := obj.Object.Redundancy.(type) { + case *cdssdk.RepRedundancy: + reader, err := iter.downloadRepObject(coorCli, iter.downloadCtx, obj, red) + if err != nil { + return nil, fmt.Errorf("downloading rep object: %w", err) } - // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行 + return &IterDownloadingObject{ + Object: obj.Object, + File: reader, + }, nil - if len(ecData.Blocks[i].NodeIDs) == 0 { - continue + case *cdssdk.ECRedundancy: + reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red) + if err != nil { + return nil, fmt.Errorf("downloading ec object: %w", err) } - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(ecData.Blocks[i].NodeIDs)) + return &IterDownloadingObject{ + Object: obj.Object, + File: reader, + }, nil + } + + return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy)) +} + +func (i *ObjectIterator) Close() { + if i.OnClosing != nil { + i.OnClosing() + } +} + +// chooseDownloadNode 选择一个下载节点 +// 1. 从与当前客户端相同地域的节点中随机选一个 +// 2. 没有用的话从所有节点中随机选一个 +func (i *ObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo { + sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation }) + if len(sameLocationEntries) > 0 { + return sameLocationEntries[rand.Intn(len(sameLocationEntries))] + } + + return entries[rand.Intn(len(entries))] +} + +func (iter *ObjectIterator) downloadRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, repRed *cdssdk.RepRedundancy) (io.ReadCloser, error) { + //采取直接读,优先选内网节点 + var chosenNodes []DownloadNodeInfo + for i := range obj.Blocks { + if len(obj.Blocks[i].CachedNodeIDs) == 0 { + return nil, fmt.Errorf("no node has block %d", obj.Blocks[i].Index) + } + + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs)) if err != nil { continue } @@ -94,58 +132,78 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { return DownloadNodeInfo{ Node: node, - IsSameLocation: node.LocationID == iter.cliLocation.LocationID, + IsSameLocation: node.LocationID == stgglb.Local.LocationID, } }) - chosenBlocks = append(chosenBlocks, ecData.Blocks[i]) chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) - } - if len(chosenBlocks) < iter.ec.EcK { - return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", iter.ec.EcK, len(chosenBlocks)) - } + var fileStrs []io.ReadCloser - reader, err := iter.downloadEcObject(iter.downloadCtx, obj.Size, chosenNodes, chosenBlocks) - if err != nil { - return nil, fmt.Errorf("ec read failed, err: %w", err) + for i := range obj.Blocks { + str, err := downloadFile(ctx, chosenNodes[i], obj.Blocks[i].FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() + } + return nil, fmt.Errorf("donwloading file: %w", err) + } + + fileStrs = append(fileStrs, str) } - return &IterDownloadingObject{ - Object: obj, - File: reader, - }, nil + fileReaders, filesCloser := myio.ToReaders(fileStrs) + return myio.AfterReadClosed(myio.Length(myio.Join(fileReaders), obj.Object.Size), func(c io.ReadCloser) { + filesCloser() + }), nil } -func (i *ECObjectIterator) Close() { - if i.OnClosing != nil { - i.OnClosing() - } -} +func (iter *ObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { + //采取直接读,优先选内网节点 + var chosenNodes []DownloadNodeInfo + var chosenBlocks []stgmodels.ObjectBlockDetail + for i := range obj.Blocks { + if len(chosenBlocks) == ecRed.K { + break + } + + // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行 + + if len(obj.Blocks[i].CachedNodeIDs) == 0 { + continue + } + + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs)) + if err != nil { + continue + } + + downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { + return DownloadNodeInfo{ + Node: node, + IsSameLocation: node.LocationID == stgglb.Local.LocationID, + } + }) + + chosenBlocks = append(chosenBlocks, obj.Blocks[i]) + chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) -// chooseDownloadNode 选择一个下载节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo { - sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationEntries) > 0 { - return sameLocationEntries[rand.Intn(len(sameLocationEntries))] } - return entries[rand.Intn(len(entries))] -} + if len(chosenBlocks) < ecRed.K { + return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks)) + } -func (iter *ECObjectIterator) downloadEcObject(ctx *DownloadContext, fileSize int64, nodes []DownloadNodeInfo, blocks []stgmodels.ObjectBlockData) (io.ReadCloser, error) { var fileStrs []io.ReadCloser - rs, err := ec.NewRs(iter.ec.EcK, iter.ec.EcN, iter.ecInfo.ChunkSize) + rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) if err != nil { return nil, fmt.Errorf("new rs: %w", err) } - for i := range blocks { - str, err := downloadFile(ctx, nodes[i], blocks[i].FileHash) + for i := range chosenBlocks { + str, err := downloadFile(ctx, chosenNodes[i], chosenBlocks[i].FileHash) if err != nil { for i -= 1; i >= 0; i-- { fileStrs[i].Close() @@ -159,13 +217,97 @@ func (iter *ECObjectIterator) downloadEcObject(ctx *DownloadContext, fileSize in fileReaders, filesCloser := myio.ToReaders(fileStrs) var indexes []int - for _, b := range blocks { + for _, b := range chosenBlocks { indexes = append(indexes, b.Index) } outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) - return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(iter.ecInfo.ChunkSize)), fileSize), func(c io.ReadCloser) { + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { filesCloser() outputsCloser() }), nil } + +func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) { + // 如果客户端与节点在同一个地域,则使用内网地址连接节点 + nodeIP := node.Node.ExternalIP + grpcPort := node.Node.ExternalGRPCPort + if node.IsSameLocation { + nodeIP = node.Node.LocalIP + grpcPort = node.Node.LocalGRPCPort + + logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID) + } + + if stgglb.IPFSPool != nil { + logger.Infof("try to use local IPFS to download file") + + reader, err := downloadFromLocalIPFS(ctx, fileHash) + if err == nil { + return reader, nil + } + + logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) + } + + return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash) +} + +func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { + // 二次获取锁 + mutex, err := reqbuilder.NewBuilder(). + // 用于从IPFS下载文件 + IPFS().ReadOneRep(nodeID, fileHash). + MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + + // 连接grpc + agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) + if err != nil { + return nil, fmt.Errorf("new agent grpc client: %w", err) + } + + reader, err := agtCli.GetIPFSFile(fileHash) + if err != nil { + return nil, fmt.Errorf("getting ipfs file: %w", err) + } + + reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { + mutex.Unlock() + }) + return reader, nil +} + +func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) { + onClosed := func() {} + if stgglb.Local.NodeID != nil { + // 二次获取锁 + mutex, err := reqbuilder.NewBuilder(). + // 用于从IPFS下载文件 + IPFS().ReadOneRep(*stgglb.Local.NodeID, fileHash). + MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + onClosed = func() { + mutex.Unlock() + } + } + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new ipfs client: %w", err) + } + + reader, err := ipfsCli.OpenRead(fileHash) + if err != nil { + return nil, fmt.Errorf("read ipfs file failed, err: %w", err) + } + + reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { + onClosed() + }) + return reader, nil +} diff --git a/common/pkgs/iterator/rep_object_iterator.go b/common/pkgs/iterator/rep_object_iterator.go deleted file mode 100644 index de2504f..0000000 --- a/common/pkgs/iterator/rep_object_iterator.go +++ /dev/null @@ -1,212 +0,0 @@ -package iterator - -import ( - "fmt" - "io" - "math/rand" - - "github.com/samber/lo" - - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/logger" - myio "gitlink.org.cn/cloudream/common/utils/io" - - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -type DownloadingObjectIterator = Iterator[*IterDownloadingObject] - -type RepObjectIterator struct { - OnClosing func() - - objects []model.Object - objectRepData []stgmod.ObjectRepData - currentIndex int - inited bool - - downloadCtx *DownloadContext - cliLocation model.Location -} - -type IterDownloadingObject struct { - Object model.Object - File io.ReadCloser -} - -type DownloadNodeInfo struct { - Node model.Node - IsSameLocation bool -} - -type DownloadContext struct { - Distlock *distlock.Service -} - -func NewRepObjectIterator(objects []model.Object, objectRepData []stgmod.ObjectRepData, downloadCtx *DownloadContext) *RepObjectIterator { - return &RepObjectIterator{ - objects: objects, - objectRepData: objectRepData, - downloadCtx: downloadCtx, - } -} - -func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { - // TODO 加锁 - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - if !i.inited { - i.inited = true - - findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP)) - if err != nil { - return nil, fmt.Errorf("finding client location: %w", err) - } - i.cliLocation = findCliLocResp.Location - } - - if i.currentIndex >= len(i.objects) { - return nil, ErrNoMoreItem - } - - item, err := i.doMove(coorCli) - i.currentIndex++ - return item, err -} - -func (i *RepObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { - repData := i.objectRepData[i.currentIndex] - if len(repData.NodeIDs) == 0 { - return nil, fmt.Errorf("no node has this file %s", repData.FileHash) - } - - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(repData.NodeIDs)) - if err != nil { - return nil, fmt.Errorf("getting nodes: %w", err) - } - - downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { - return DownloadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == i.cliLocation.LocationID, - } - }) - - reader, err := downloadFile(i.downloadCtx, i.chooseDownloadNode(downloadNodes), repData.FileHash) - if err != nil { - return nil, fmt.Errorf("rep read failed, err: %w", err) - } - return &IterDownloadingObject{ - Object: i.objects[i.currentIndex], - File: reader, - }, nil -} - -func (i *RepObjectIterator) Close() { - if i.OnClosing != nil { - i.OnClosing() - } -} - -// chooseDownloadNode 选择一个下载节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo { - sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationEntries) > 0 { - return sameLocationEntries[rand.Intn(len(sameLocationEntries))] - } - - return entries[rand.Intn(len(entries))] -} - -func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) { - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := node.Node.ExternalIP - grpcPort := node.Node.ExternalGRPCPort - if node.IsSameLocation { - nodeIP = node.Node.LocalIP - grpcPort = node.Node.LocalGRPCPort - - logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID) - } - - if stgglb.IPFSPool != nil { - logger.Infof("try to use local IPFS to download file") - - reader, err := downloadFromLocalIPFS(ctx, fileHash) - if err == nil { - return reader, nil - } - - logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) - } - - return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash) -} - -func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { - // 二次获取锁 - mutex, err := reqbuilder.NewBuilder(). - // 用于从IPFS下载文件 - IPFS().ReadOneRep(nodeID, fileHash). - MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - - // 连接grpc - agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) - if err != nil { - return nil, fmt.Errorf("new agent grpc client: %w", err) - } - - reader, err := agtCli.GetIPFSFile(fileHash) - if err != nil { - return nil, fmt.Errorf("getting ipfs file: %w", err) - } - - reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { - mutex.Unlock() - }) - return reader, nil -} - -func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) { - onClosed := func() {} - if stgglb.Local.NodeID != nil { - // 二次获取锁 - mutex, err := reqbuilder.NewBuilder(). - // 用于从IPFS下载文件 - IPFS().ReadOneRep(*stgglb.Local.NodeID, fileHash). - MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - onClosed = func() { - mutex.Unlock() - } - } - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new ipfs client: %w", err) - } - - reader, err := ipfsCli.OpenRead(fileHash) - if err != nil { - return nil, fmt.Errorf("read ipfs file failed, err: %w", err) - } - - reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { - onClosed() - }) - return reader, nil -} diff --git a/common/pkgs/mq/coordinator/common.go b/common/pkgs/mq/coordinator/common.go deleted file mode 100644 index f0a60b9..0000000 --- a/common/pkgs/mq/coordinator/common.go +++ /dev/null @@ -1,64 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" -) - -type CommonService interface { - FindClientLocation(msg *FindClientLocation) (*FindClientLocationResp, *mq.CodeMessage) - - GetECConfig(msg *GetECConfig) (*GetECConfigResp, *mq.CodeMessage) -} - -// 查询指定IP所属的地域 -var _ = Register(Service.FindClientLocation) - -type FindClientLocation struct { - mq.MessageBodyBase - IP string `json:"ip"` -} -type FindClientLocationResp struct { - mq.MessageBodyBase - Location model.Location `json:"location"` -} - -func NewFindClientLocation(ip string) *FindClientLocation { - return &FindClientLocation{ - IP: ip, - } -} -func NewFindClientLocationResp(location model.Location) *FindClientLocationResp { - return &FindClientLocationResp{ - Location: location, - } -} -func (client *Client) FindClientLocation(msg *FindClientLocation) (*FindClientLocationResp, error) { - return mq.Request(Service.FindClientLocation, client.rabbitCli, msg) -} - -// 获取EC具体配置 -var _ = Register(Service.GetECConfig) - -type GetECConfig struct { - mq.MessageBodyBase - ECName string `json:"ecName"` -} -type GetECConfigResp struct { - mq.MessageBodyBase - Config model.Ec `json:"config"` -} - -func NewGetECConfig(ecName string) *GetECConfig { - return &GetECConfig{ - ECName: ecName, - } -} -func NewGetECConfigResp(config model.Ec) *GetECConfigResp { - return &GetECConfigResp{ - Config: config, - } -} -func (client *Client) GetECConfig(msg *GetECConfig) (*GetECConfigResp, error) { - return mq.Request(Service.GetECConfig, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index c355a01..780e2af 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -11,7 +11,7 @@ import ( type ObjectService interface { GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) - GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, *mq.CodeMessage) + GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 @@ -42,28 +42,28 @@ func (client *Client) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObje return mq.Request(Service.GetPackageObjects, client.rabbitCli, msg) } -// 获取指定Object的EC数据,返回的Objects会按照ObjectID升序 -var _ = Register(Service.GetPackageObjectECData) +// 获取Package中所有Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序 +var _ = Register(Service.GetPackageObjectDetails) -type GetPackageObjectECData struct { +type GetPackageObjectDetails struct { mq.MessageBodyBase PackageID cdssdk.PackageID `json:"packageID"` } -type GetPackageObjectECDataResp struct { +type GetPackageObjectDetailsResp struct { mq.MessageBodyBase - Data []stgmod.ObjectECData `json:"data"` + Objects []stgmod.ObjectDetail `json:"objects"` } -func NewGetPackageObjectECData(packageID cdssdk.PackageID) *GetPackageObjectECData { - return &GetPackageObjectECData{ +func NewGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails { + return &GetPackageObjectDetails{ PackageID: packageID, } } -func NewGetPackageObjectECDataResp(data []stgmod.ObjectECData) *GetPackageObjectECDataResp { - return &GetPackageObjectECDataResp{ - Data: data, +func NewGetPackageObjectDetailsResp(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp { + return &GetPackageObjectDetailsResp{ + Objects: objects, } } -func (client *Client) GetPackageObjectECData(msg *GetPackageObjectECData) (*GetPackageObjectECDataResp, error) { - return mq.Request(Service.GetPackageObjectECData, client.rabbitCli, msg) +func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, error) { + return mq.Request(Service.GetPackageObjectDetails, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index d9e402e..7962f97 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -93,10 +93,10 @@ type UpdatePackageResp struct { mq.MessageBodyBase } type AddObjectInfo struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - Redundancy cdssdk.Redundancy `json:"redundancy"` - Blocks []stgmod.ObjectBlockData `json:"blocks"` + Path string `json:"path"` + Size int64 `json:"size,string"` + Redundancy cdssdk.Redundancy `json:"redundancy"` + Blocks []stgmod.ObjectBlockDetail `json:"blocks"` } func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes []cdssdk.ObjectID) *UpdatePackage { @@ -109,7 +109,7 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes func NewUpdatePackageResp() *UpdatePackageResp { return &UpdatePackageResp{} } -func NewAddObjectInfo(path string, size int64, redundancy cdssdk.Redundancy, blocks []stgmod.ObjectBlockData) AddObjectInfo { +func NewAddObjectInfo(path string, size int64, redundancy cdssdk.Redundancy, blocks []stgmod.ObjectBlockDetail) AddObjectInfo { return AddObjectInfo{ Path: path, Size: size, diff --git a/common/pkgs/mq/coordinator/server.go b/common/pkgs/mq/coordinator/server.go index 76b57a7..8444c6f 100644 --- a/common/pkgs/mq/coordinator/server.go +++ b/common/pkgs/mq/coordinator/server.go @@ -13,8 +13,6 @@ type Service interface { CacheService - CommonService - NodeService ObjectService diff --git a/common/utils/utils.go b/common/utils/utils.go index fc93d42..0ef07e2 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -3,9 +3,11 @@ package utils import ( "path/filepath" "strconv" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) // MakeStorageLoadPackagePath Load操作时,写入的文件夹的名称 -func MakeStorageLoadPackagePath(stgDir string, userID int64, packageID int64) string { - return filepath.Join(stgDir, strconv.FormatInt(userID, 10), "packages", strconv.FormatInt(packageID, 10)) +func MakeStorageLoadPackagePath(stgDir string, userID cdssdk.UserID, packageID cdssdk.PackageID) string { + return filepath.Join(stgDir, strconv.FormatInt(int64(userID), 10), "packages", strconv.FormatInt(int64(packageID), 10)) } diff --git a/coordinator/internal/services/conmmon.go b/coordinator/internal/services/conmmon.go deleted file mode 100644 index 063f6da..0000000 --- a/coordinator/internal/services/conmmon.go +++ /dev/null @@ -1,30 +0,0 @@ -package services - -import ( - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -func (svc *Service) FindClientLocation(msg *coormq.FindClientLocation) (*coormq.FindClientLocationResp, *mq.CodeMessage) { - location, err := svc.db.Location().FindLocationByExternalIP(svc.db.SQLCtx(), msg.IP) - if err != nil { - logger.WithField("IP", msg.IP). - Warnf("finding location by external ip: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "query client location failed") - } - - return mq.ReplyOK(coormq.NewFindClientLocationResp(location)) -} - -func (svc *Service) GetECConfig(msg *coormq.GetECConfig) (*coormq.GetECConfigResp, *mq.CodeMessage) { - ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.ECName) - if err != nil { - logger.WithField("ECName", msg.ECName). - Warnf("query ec failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "query ec failed") - } - - return mq.ReplyOK(coormq.NewGetECConfigResp(ec)) -}