From 3ee0e068e978c74fd4dfd0938be7d0936026b3b9 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sat, 16 Mar 2024 11:05:39 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E7=94=A8=E6=88=B7ID=E4=BB=8E1=E5=BC=80?= =?UTF-8?q?=E5=A7=8B=EF=BC=8C0=E4=B8=BA=E6=97=A0=E6=95=88ID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/bucket.go | 6 +++--- client/internal/cmdline/cache.go | 2 +- client/internal/cmdline/storage.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/client/internal/cmdline/bucket.go b/client/internal/cmdline/bucket.go index a35dd56..208a449 100644 --- a/client/internal/cmdline/bucket.go +++ b/client/internal/cmdline/bucket.go @@ -8,7 +8,7 @@ import ( ) func BucketListUserBuckets(ctx CommandContext) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) buckets, err := ctx.Cmdline.Svc.BucketSvc().GetUserBuckets(userID) if err != nil { @@ -29,7 +29,7 @@ func BucketListUserBuckets(ctx CommandContext) error { } func BucketCreateBucket(ctx CommandContext, bucketName string) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) bucketID, err := ctx.Cmdline.Svc.BucketSvc().CreateBucket(userID, bucketName) if err != nil { @@ -41,7 +41,7 @@ func BucketCreateBucket(ctx CommandContext, bucketName string) error { } func BucketDeleteBucket(ctx CommandContext, bucketID cdssdk.BucketID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) err := ctx.Cmdline.Svc.BucketSvc().DeleteBucket(userID, bucketID) if err != nil { diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index a94859e..08ab896 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -8,7 +8,7 @@ import ( ) func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { - taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(0, packageID, nodeID) + taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(1, packageID, nodeID) if err != nil { return fmt.Errorf("start cache moving package: %w", err) } diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index 355e85c..e47665c 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -8,7 +8,7 @@ import ( ) func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error { - nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID) + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(1, packageID, storageID) if err != nil { return fmt.Errorf("start loading package to storage: %w", err) } @@ -31,7 +31,7 @@ func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageI } func StorageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string) error { - nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(0, bucketID, name, storageID, path, nil) + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(1, bucketID, name, storageID, path, nil) if err != nil { return fmt.Errorf("start storage uploading package: %w", err) } From eb50316b87c9928930ec4fcedd020a46b2d79f31 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 18 Mar 2024 10:11:24 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E5=8E=9F=E6=9C=AC=E7=9A=84=E6=96=B0?= =?UTF-8?q?=E5=BB=BAPackage=E6=8E=A5=E5=8F=A3=E6=8B=86=E6=88=90=E6=96=B0?= =?UTF-8?q?=E5=BB=BA+=E4=B8=8A=E4=BC=A0=E4=B8=A4=E4=B8=AA=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/create_package.go | 60 +++++++-- client/internal/cmdline/commandline.go | 5 + client/internal/cmdline/object.go | 58 +++++++++ client/internal/cmdline/package.go | 115 ++---------------- client/internal/http/bucket.go | 61 ++++++++++ client/internal/http/cache.go | 2 +- client/internal/http/object.go | 52 +++++++- client/internal/http/package.go | 69 ++--------- client/internal/http/server.go | 26 ++-- client/internal/http/storage.go | 2 +- client/internal/services/object.go | 17 +++ client/internal/services/package.go | 46 +++---- client/internal/task/create_package.go | 35 ------ client/internal/task/update_package.go | 36 ------ client/internal/task/upload_objects.go | 36 ++++++ common/assets/scripts/create_database.sql | 2 + common/pkgs/cmd/update_package.go | 88 -------------- .../{create_package.go => upload_objects.go} | 37 +++--- common/pkgs/db/model/model.go | 6 +- common/pkgs/db/object.go | 76 +++--------- common/pkgs/mq/coordinator/package.go | 22 ++-- .../event/check_package_redundancy.go | 2 +- 22 files changed, 384 insertions(+), 469 deletions(-) create mode 100644 client/internal/cmdline/object.go create mode 100644 client/internal/http/bucket.go delete mode 100644 client/internal/task/create_package.go delete mode 100644 client/internal/task/update_package.go create mode 100644 client/internal/task/upload_objects.go delete mode 100644 common/pkgs/cmd/update_package.go rename common/pkgs/cmd/{create_package.go => upload_objects.go} (87%) diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index b70fae8..275d522 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -1,26 +1,39 @@ package task import ( + "fmt" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" + "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type CreatePackageResult = cmd.CreatePackageResult +type CreatePackageResult struct { + PackageID cdssdk.PackageID + Objects []cmd.ObjectUploadResult +} type CreatePackage struct { - cmd cmd.CreatePackage - - Result *CreatePackageResult + userID cdssdk.UserID + bucketID cdssdk.BucketID + name string + objIter iterator.UploadingObjectIterator + nodeAffinity *cdssdk.NodeID + Result *CreatePackageResult } func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { return &CreatePackage{ - cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), + userID: userID, + bucketID: bucketID, + name: name, + objIter: objIter, + nodeAffinity: nodeAffinity, } } @@ -29,12 +42,43 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c log.Debugf("begin") defer log.Debugf("end") - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + err = fmt.Errorf("new coordinator client: %w", err) + log.Warn(err.Error()) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + createResp, err := coorCli.CreatePackage(coordinator.NewCreatePackage(t.userID, t.bucketID, t.name)) + if err != nil { + err = fmt.Errorf("creating package: %w", err) + log.Error(err.Error()) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ Distlock: ctx.distlock, }) - t.Result = ret + if err != nil { + err = fmt.Errorf("uploading objects: %w", err) + log.Error(err.Error()) + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + t.Result.PackageID = createResp.PackageID + t.Result.Objects = uploadRet.Objects - complete(err, CompleteOption{ + complete(nil, CompleteOption{ RemovingDelay: time.Minute, }) } diff --git a/client/internal/cmdline/commandline.go b/client/internal/cmdline/commandline.go index 102154f..2b71d93 100644 --- a/client/internal/cmdline/commandline.go +++ b/client/internal/cmdline/commandline.go @@ -38,3 +38,8 @@ func (c *Commandline) DispatchCommand(allArgs []string) { os.Exit(1) } } + +func MustAddCmd(fn any, prefixWords ...string) any { + commands.MustAdd(fn, prefixWords...) + return nil +} diff --git a/client/internal/cmdline/object.go b/client/internal/cmdline/object.go new file mode 100644 index 0000000..fad17cf --- /dev/null +++ b/client/internal/cmdline/object.go @@ -0,0 +1,58 @@ +package cmdline + +import ( + "fmt" + "os" + "path/filepath" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" +) + +var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath string, nodeAffinity []cdssdk.NodeID) error { + userID := cdssdk.UserID(1) + + var uploadFilePathes []string + err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { + if err != nil { + return nil + } + + if !fi.IsDir() { + uploadFilePathes = append(uploadFilePathes, fname) + } + + return nil + }) + if err != nil { + return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) + } + + var nodeAff *cdssdk.NodeID + if len(nodeAffinity) > 0 { + n := cdssdk.NodeID(nodeAffinity[0]) + nodeAff = &n + } + + objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) + taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploading(userID, packageID, objIter, nodeAff) + if err != nil { + return fmt.Errorf("update objects to package %d failed, err: %w", packageID, err) + } + + for { + complete, _, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploading(taskID, time.Second*5) + if complete { + if err != nil { + return fmt.Errorf("uploading objects: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("wait updating: %w", err) + } + } +}, "obj", "upload") diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index a4a7dfd..a15bdbe 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -5,7 +5,6 @@ import ( "io" "os" "path/filepath" - "time" "github.com/jedib0t/go-pretty/v6/table" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -13,7 +12,7 @@ import ( ) func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) packages, err := ctx.Cmdline.Svc.BucketSvc().GetBucketPackages(userID, bucketID) if err != nil { @@ -34,13 +33,15 @@ func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) err } func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error { + userID := cdssdk.UserID(1) + err := os.MkdirAll(outputDir, os.ModePerm) if err != nil { return fmt.Errorf("create output directory %s failed, err: %w", outputDir, err) } // 下载文件 - objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(0, packageID) + objIter, err := ctx.Cmdline.Svc.PackageSvc().DownloadPackage(userID, packageID) if err != nil { return fmt.Errorf("download object failed, err: %w", err) } @@ -91,108 +92,20 @@ func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outp return nil } -func PackageCreatePackage(ctx CommandContext, name string, rootPath string, bucketID cdssdk.BucketID, nodeAffinity []cdssdk.NodeID) error { - rootPath = filepath.Clean(rootPath) - - var uploadFilePathes []string - err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) - } - - var nodeAff *cdssdk.NodeID - if len(nodeAffinity) > 0 { - n := cdssdk.NodeID(nodeAffinity[0]) - nodeAff = &n - } - - objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingPackage(0, bucketID, name, objIter, nodeAff) +func PackageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string) error { + userID := cdssdk.UserID(1) + pkgID, err := ctx.Cmdline.Svc.PackageSvc().Create(userID, bucketID, name) if err != nil { - return fmt.Errorf("upload file data failed, err: %w", err) - } - - for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("uploading package: %w", err) - } - - tb := table.NewWriter() - - tb.AppendHeader(table.Row{"Path", "ObjectID"}) - for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { - tb.AppendRow(table.Row{ - uploadObjectResult.ObjectResults[i].Info.Path, - uploadObjectResult.ObjectResults[i].ObjectID, - }) - } - fmt.Print(tb.Render()) - fmt.Printf("\n%v", uploadObjectResult.PackageID) - return nil - } - - if err != nil { - return fmt.Errorf("wait uploading: %w", err) - } - } -} - -func PackageUpdatePackage(ctx CommandContext, packageID cdssdk.PackageID, rootPath string) error { - //userID := int64(0) - - var uploadFilePathes []string - err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - return fmt.Errorf("open directory %s failed, err: %w", rootPath, err) - } - - objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartUpdatingPackage(0, packageID, objIter) - if err != nil { - return fmt.Errorf("update package %d failed, err: %w", packageID, err) + return err } - for { - complete, _, err := ctx.Cmdline.Svc.PackageSvc().WaitUpdatingPackage(taskID, time.Second*5) - if complete { - if err != nil { - return fmt.Errorf("updating package: %w", err) - } - - return nil - } - - if err != nil { - return fmt.Errorf("wait updating: %w", err) - } - } + fmt.Printf("%v\n", pkgID) + return nil } func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) err := ctx.Cmdline.Svc.PackageSvc().DeletePackage(userID, packageID) if err != nil { return fmt.Errorf("delete package %d failed, err: %w", packageID, err) @@ -201,7 +114,7 @@ func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error } func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID) fmt.Printf("resp: %v\n", resp) if err != nil { @@ -211,7 +124,7 @@ func PackageGetCachedNodes(ctx CommandContext, packageID cdssdk.PackageID) error } func PackageGetLoadedNodes(ctx CommandContext, packageID cdssdk.PackageID) error { - userID := cdssdk.UserID(0) + userID := cdssdk.UserID(1) nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID) fmt.Printf("nodeIDs: %v\n", nodeIDs) if err != nil { @@ -227,8 +140,6 @@ func init() { commands.MustAdd(PackageCreatePackage, "pkg", "new") - commands.MustAdd(PackageUpdatePackage, "pkg", "update") - commands.MustAdd(PackageDeletePackage, "pkg", "delete") commands.MustAdd(PackageGetCachedNodes, "pkg", "cached") diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go new file mode 100644 index 0000000..7e749b0 --- /dev/null +++ b/client/internal/http/bucket.go @@ -0,0 +1,61 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +type BucketService struct { + *Server +} + +func (s *Server) Bucket() *BucketService { + return &BucketService{ + Server: s, + } +} + +func (s *BucketService) Create(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.Create") + + var req cdssdk.BucketCreateReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + bucketID, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.BucketName) + if err != nil { + log.Warnf("creating bucket: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create bucket failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.BucketCreateResp{ + BucketID: bucketID, + })) +} + +func (s *BucketService) Delete(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.Delete") + + var req cdssdk.BucketDeleteReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + if err := s.svc.BucketSvc().DeleteBucket(req.UserID, req.BucketID); err != nil { + log.Warnf("deleting bucket: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete bucket failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) +} diff --git a/client/internal/http/cache.go b/client/internal/http/cache.go index e1f1210..ab86fda 100644 --- a/client/internal/http/cache.go +++ b/client/internal/http/cache.go @@ -14,7 +14,7 @@ type CacheService struct { *Server } -func (s *Server) CacheSvc() *CacheService { +func (s *Server) Cache() *CacheService { return &CacheService{ Server: s, } diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 9b330ef..d7dba18 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -2,7 +2,9 @@ package http import ( "io" + "mime/multipart" "net/http" + "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -15,12 +17,60 @@ type ObjectService struct { *Server } -func (s *Server) ObjectSvc() *ObjectService { +func (s *Server) Object() *ObjectService { return &ObjectService{ Server: s, } } +type ObjectUploadReq struct { + Info cdssdk.ObjectUploadInfo `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` +} + +func (s *ObjectService) Upload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Upload") + + var req ObjectUploadReq + if err := ctx.ShouldBind(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + var err error + + objIter := mapMultiPartFileToUploadingObject(req.Files) + + taskID, err := s.svc.ObjectSvc().StartUploading(req.Info.UserID, req.Info.PackageID, objIter, req.Info.NodeAffinity) + + if err != nil { + log.Warnf("start uploading object task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) + return + } + + for { + complete, _, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) + if complete { + if err != nil { + log.Warnf("uploading object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading object failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) + return + } + + if err != nil { + log.Warnf("waiting task: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) + return + } + } +} + type ObjectDownloadReq struct { UserID *cdssdk.UserID `form:"userID" binding:"required"` ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"` diff --git a/client/internal/http/package.go b/client/internal/http/package.go index c1c454e..cdaf3e0 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -3,7 +3,6 @@ package http import ( "mime/multipart" "net/http" - "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -19,7 +18,7 @@ type PackageService struct { *Server } -func (s *Server) PackageSvc() *PackageService { +func (s *Server) Package() *PackageService { return &PackageService{ Server: s, } @@ -53,71 +52,25 @@ func (s *PackageService) Get(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg})) } -type PackageUploadReq struct { - Info PackageUploadInfo `form:"info" binding:"required"` - Files []*multipart.FileHeader `form:"files"` -} - -type PackageUploadInfo struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - BucketID *cdssdk.BucketID `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - NodeAffinity *cdssdk.NodeID `json:"nodeAffinity"` -} - -type PackageUploadResp struct { - PackageID cdssdk.PackageID `json:"packageID,string"` -} - -func (s *PackageService) Upload(ctx *gin.Context) { - log := logger.WithField("HTTP", "Package.Upload") - - var req PackageUploadReq - if err := ctx.ShouldBind(&req); err != nil { +func (s *PackageService) Create(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Create") + var req cdssdk.PackageCreateReq + if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - s.uploadEC(ctx, &req) -} - -func (s *PackageService) uploadEC(ctx *gin.Context, req *PackageUploadReq) { - log := logger.WithField("HTTP", "Package.Upload") - - var err error - - objIter := mapMultiPartFileToUploadingObject(req.Files) - - taskID, err := s.svc.PackageSvc().StartCreatingPackage(*req.Info.UserID, *req.Info.BucketID, req.Info.Name, objIter, req.Info.NodeAffinity) - + pkgID, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) if err != nil { - log.Warnf("start uploading ec package task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "start uploading task failed")) + log.Warnf("creating package: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create package failed")) return } - for { - complete, createResult, err := s.svc.PackageSvc().WaitCreatingPackage(taskID, time.Second*5) - if complete { - if err != nil { - log.Warnf("uploading ec package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "uploading ec package failed")) - return - } - - ctx.JSON(http.StatusOK, OK(PackageUploadResp{ - PackageID: createResult.PackageID, - })) - return - } - - if err != nil { - log.Warnf("waiting task: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "wait uploading task failed")) - return - } - } + ctx.JSON(http.StatusOK, OK(cdssdk.PackageCreateResp{ + PackageID: pkgID, + })) } type PackageDeleteReq struct { diff --git a/client/internal/http/server.go b/client/internal/http/server.go index ee1954f..462f76a 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -39,18 +39,22 @@ func (s *Server) Serve() error { } func (s *Server) initRouters() { - s.engine.GET("/object/download", s.ObjectSvc().Download) - s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.ObjectSvc().GetPackageObjects) + s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download) + s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) + s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) - s.engine.GET("/package/get", s.PackageSvc().Get) - s.engine.POST("/package/upload", s.PackageSvc().Upload) - s.engine.POST("/package/delete", s.PackageSvc().Delete) - s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes) - s.engine.GET("/package/getLoadedNodes", s.PackageSvc().GetLoadedNodes) + s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) + s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) + s.engine.POST("/package/delete", s.Package().Delete) + s.engine.GET("/package/getCachedNodes", s.Package().GetCachedNodes) + s.engine.GET("/package/getLoadedNodes", s.Package().GetLoadedNodes) - s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage) - s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) - s.engine.GET("/storage/getInfo", s.StorageSvc().GetInfo) + s.engine.POST("/storage/loadPackage", s.Storage().LoadPackage) + s.engine.POST("/storage/createPackage", s.Storage().CreatePackage) + s.engine.GET("/storage/getInfo", s.Storage().GetInfo) - s.engine.POST(cdssdk.CacheMovePackagePath, s.CacheSvc().MovePackage) + s.engine.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) + + s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) + s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) } diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index 1b30f1c..3ada149 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -14,7 +14,7 @@ type StorageService struct { *Server } -func (s *Server) StorageSvc() *StorageService { +func (s *Server) Storage() *StorageService { return &StorageService{ Server: s, } diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 2b988cf..0dc7374 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -3,10 +3,13 @@ package services import ( "fmt" "io" + "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -18,6 +21,20 @@ func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } +func (svc *ObjectService) StartUploading(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) { + tsk := svc.TaskMgr.StartNew(mytask.NewUploadObjects(userID, packageID, objIter, nodeAffinity)) + return tsk.ID(), nil +} + +func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration) (bool, *mytask.UploadObjectsResult, error) { + tsk := svc.TaskMgr.FindByID(taskID) + if tsk.WaitTimeout(waitTimeout) { + updatePkgTask := tsk.Body().(*mytask.UploadObjects) + return true, updatePkgTask.Result, tsk.Error() + } + return false, nil, nil +} + func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { panic("not implement yet!") } diff --git a/client/internal/services/package.go b/client/internal/services/package.go index da3d420..9667356 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -2,13 +2,10 @@ package services import ( "fmt" - "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - agtcmd "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -37,6 +34,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) return &getResp.Package, nil } +func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return 0, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bucketID, name)) + if err != nil { + return 0, fmt.Errorf("creating package: %w", err) + } + + return resp.PackageID, nil +} + func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -56,34 +68,6 @@ func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssd return iter, nil } -func (svc *PackageService) StartCreatingPackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity)) - return tsk.ID(), nil -} - -func (svc *PackageService) WaitCreatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreatePackageResult, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*mytask.CreatePackage) - return true, cteatePkgTask.Result, tsk.Error() - } - return false, nil, nil -} - -func (svc *PackageService) StartUpdatingPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) (string, error) { - tsk := svc.TaskMgr.StartNew(mytask.NewUpdatePackage(userID, packageID, objIter)) - return tsk.ID(), nil -} - -func (svc *PackageService) WaitUpdatingPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.UpdatePackageResult, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - updatePkgTask := tsk.Body().(*mytask.UpdatePackage) - return true, updatePkgTask.Result, tsk.Error() - } - return false, nil, nil -} - func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/client/internal/task/create_package.go b/client/internal/task/create_package.go deleted file mode 100644 index 1f826b9..0000000 --- a/client/internal/task/create_package.go +++ /dev/null @@ -1,35 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" -) - -type CreatePackageResult = cmd.CreatePackageResult - -type CreatePackage struct { - cmd cmd.CreatePackage - - Result *CreatePackageResult -} - -func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { - return &CreatePackage{ - cmd: *cmd.NewCreatePackage(userID, bucketID, name, objIter, nodeAffinity), - } -} - -func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ - Distlock: ctx.distlock, - }) - t.Result = ret - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/client/internal/task/update_package.go b/client/internal/task/update_package.go deleted file mode 100644 index 518cc98..0000000 --- a/client/internal/task/update_package.go +++ /dev/null @@ -1,36 +0,0 @@ -package task - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" -) - -type UpdatePackageResult = cmd.UpdatePackageResult - -type UpdatePackage struct { - cmd cmd.UpdatePackage - - Result *UpdatePackageResult -} - -func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator) *UpdatePackage { - return &UpdatePackage{ - cmd: *cmd.NewUpdatePackage(userID, packageID, objectIter), - } -} - -func (t *UpdatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ - Distlock: ctx.distlock, - }) - - t.Result = ret - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/client/internal/task/upload_objects.go b/client/internal/task/upload_objects.go new file mode 100644 index 0000000..b138add --- /dev/null +++ b/client/internal/task/upload_objects.go @@ -0,0 +1,36 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/task" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" +) + +type UploadObjectsResult = cmd.UploadObjectsResult + +type UploadObjects struct { + cmd cmd.UploadObjects + + Result *UploadObjectsResult +} + +func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { + return &UploadObjects{ + cmd: *cmd.NewUploadObjects(userID, packageID, objectIter, nodeAffinity), + } +} + +func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{ + Distlock: ctx.distlock, + }) + + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 3652c7a..415b0bf 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -122,6 +122,8 @@ create table Object ( Size bigint not null comment '对象大小(Byte)', FileHash varchar(100) not null comment '完整对象的FileHash', Redundancy JSON not null comment '冗余策略', + CreateTime timestamp not null comment '创建时间', + UpdateTime timestamp not null comment '更新时间', UNIQUE KEY PackagePath (PackageID, Path) ) comment = '对象表'; diff --git a/common/pkgs/cmd/update_package.go b/common/pkgs/cmd/update_package.go deleted file mode 100644 index 015644f..0000000 --- a/common/pkgs/cmd/update_package.go +++ /dev/null @@ -1,88 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/samber/lo" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -type UpdatePackage struct { - userID cdssdk.UserID - packageID cdssdk.PackageID - objectIter iterator.UploadingObjectIterator -} - -type UpdatePackageResult struct { - ObjectResults []ObjectUploadResult -} - -type UpdateNodeInfo struct { - UploadNodeInfo - HasOldObject bool -} - -func NewUpdatePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator) *UpdatePackage { - return &UpdatePackage{ - userID: userID, - packageID: packageID, - objectIter: objIter, - } -} - -func (t *UpdatePackage) Execute(ctx *UpdatePackageContext) (*UpdatePackageResult, error) { - defer t.objectIter.Close() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - - getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) - if err != nil { - return nil, fmt.Errorf("getting user nodes: %w", err) - } - - userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo { - return UploadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == stgglb.Local.LocationID, - } - }) - - // 给上传节点的IPFS加锁 - ipfsReqBlder := reqbuilder.NewBuilder() - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if stgglb.Local.NodeID != nil { - ipfsReqBlder.IPFS().Buzy(*stgglb.Local.NodeID) - } - for _, node := range userNodes { - if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID { - continue - } - - ipfsReqBlder.IPFS().Buzy(node.Node.NodeID) - } - // TODO 加Object的Create锁,最好一次性能加多个 - // 防止上传的副本被清除 - ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - defer ipfsMutex.Unlock() - - rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, nil) - if err != nil { - return nil, err - } - - return &UpdatePackageResult{ - ObjectResults: rets, - }, nil -} diff --git a/common/pkgs/cmd/create_package.go b/common/pkgs/cmd/upload_objects.go similarity index 87% rename from common/pkgs/cmd/create_package.go rename to common/pkgs/cmd/upload_objects.go index f250dc5..b4234f3 100644 --- a/common/pkgs/cmd/create_package.go +++ b/common/pkgs/cmd/upload_objects.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "math/rand" + "time" "github.com/samber/lo" @@ -18,17 +19,15 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type CreatePackage struct { +type UploadObjects struct { userID cdssdk.UserID - bucketID cdssdk.BucketID - name string + packageID cdssdk.PackageID objectIter iterator.UploadingObjectIterator nodeAffinity *cdssdk.NodeID } -type CreatePackageResult struct { - PackageID cdssdk.PackageID - ObjectResults []ObjectUploadResult +type UploadObjectsResult struct { + Objects []ObjectUploadResult } type ObjectUploadResult struct { @@ -43,21 +42,20 @@ type UploadNodeInfo struct { IsSameLocation bool } -type UpdatePackageContext struct { +type UploadObjectsContext struct { Distlock *distlock.Service } -func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *CreatePackage { - return &CreatePackage{ +func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { + return &UploadObjects{ userID: userID, - bucketID: bucketID, - name: name, + packageID: packageID, objectIter: objIter, nodeAffinity: nodeAffinity, } } -func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult, error) { +func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult, error) { defer t.objectIter.Close() coorCli, err := stgglb.CoordinatorMQPool.Acquire() @@ -65,11 +63,6 @@ func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult return nil, fmt.Errorf("new coordinator client: %w", err) } - createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name)) - if err != nil { - return nil, fmt.Errorf("creating package: %w", err) - } - getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { return nil, fmt.Errorf("getting user nodes: %w", err) @@ -103,14 +96,13 @@ func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult } defer ipfsMutex.Unlock() - rets, err := uploadAndUpdatePackage(createPkgResp.PackageID, t.objectIter, userNodes, t.nodeAffinity) + rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userNodes, t.nodeAffinity) if err != nil { return nil, err } - return &CreatePackageResult{ - PackageID: createPkgResp.PackageID, - ObjectResults: rets, + return &UploadObjectsResult{ + Objects: rets, }, nil } @@ -158,6 +150,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo err = func() error { defer objInfo.File.Close() + uploadTime := time.Now() fileHash, err := uploadFile(objInfo.File, uploadNode) if err != nil { return fmt.Errorf("uploading file: %w", err) @@ -168,7 +161,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo Error: err, }) - adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) + adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadTime, uploadNode.Node.NodeID)) return nil }() if err != nil { diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 531d0e0..7b5ee66 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -46,11 +46,7 @@ type UserStorage struct { StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` } -type Bucket struct { - BucketID cdssdk.BucketID `db:"BucketID" json:"bucketID"` - Name string `db:"Name" json:"name"` - CreatorID cdssdk.UserID `db:"CreatorID" json:"creatorID"` -} +type Bucket = cdssdk.Bucket type Package = cdssdk.Package diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 24e3941..20c1afb 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -43,10 +43,10 @@ func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.Packag return objIDs, nil } -func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) { - sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?)" +func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime) values(?,?,?,?,?,?,?)" - ret, err := ctx.Exec(sql, packageID, path, size, redundancy) + ret, err := ctx.Exec(sql, obj.PackageID, obj.Path, obj.Size, obj.FileHash, obj.Redundancy, obj.UpdateTime, obj.UpdateTime) if err != nil { return 0, fmt.Errorf("insert object failed, err: %w", err) } @@ -56,64 +56,18 @@ func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path stri return 0, fmt.Errorf("get id of inserted object failed, err: %w", err) } - return objectID, nil + return cdssdk.ObjectID(objectID), nil } -// 创建或者更新记录,返回值true代表是创建,false代表是更新 -func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string) (cdssdk.ObjectID, bool, error) { - // 首次上传Object时,默认不启用冗余,即使是在更新一个已有的Object也是如此 - defRed := cdssdk.NewNoneRedundancy() - - sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?) on duplicate key update Size = ?, FileHash = ?, Redundancy = ?" - - ret, err := ctx.Exec(sql, packageID, path, size, fileHash, defRed, size, fileHash, defRed) - if err != nil { - return 0, false, fmt.Errorf("insert object failed, err: %w", err) - } - - affs, err := ret.RowsAffected() - if err != nil { - return 0, false, fmt.Errorf("getting affected rows: %w", err) - } - - // 影响行数为1时是插入,为2时是更新 - if affs == 1 { - objectID, err := ret.LastInsertId() - if err != nil { - return 0, false, fmt.Errorf("get id of inserted object failed, err: %w", err) - } - return cdssdk.ObjectID(objectID), true, nil - } - - var objID cdssdk.ObjectID - if err = sqlx.Get(ctx, &objID, "select ObjectID from Object where PackageID = ? and Path = ?", packageID, path); err != nil { - return 0, false, fmt.Errorf("getting object id: %w", err) - } - - return objID, false, nil -} - -// 批量创建或者更新记录 +// 可以用于批量创建或者更新记录 +// 用于创建时,需要额外检查PackageID+Path的唯一性 +// 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { - sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy)" + - " values(:PackageID,:Path,:Size,:FileHash,:Redundancy)" + - " on duplicate key update Size = values(Size), FileHash = values(FileHash), Redundancy = values(Redundancy)" - - return BatchNamedExec(ctx, sql, 5, objs, nil) -} - -func (*ObjectDB) UpdateFileInfo(ctx SQLContext, objectID cdssdk.ObjectID, fileSize int64) (bool, error) { - ret, err := ctx.Exec("update Object set FileSize = ? where ObjectID = ?", fileSize, objectID) - if err != nil { - return false, err - } - - cnt, err := ret.RowsAffected() - if err != nil { - return false, fmt.Errorf("get affected rows failed, err: %w", err) - } + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + + " values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + + " on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" - return cnt > 0, nil + return BatchNamedExec(ctx, sql, 7, objs, nil) } func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { @@ -183,6 +137,8 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] Size: add.Size, FileHash: add.FileHash, Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式 + CreateTime: add.UploadTime, + UpdateTime: add.UploadTime, }) } @@ -256,9 +212,9 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb // 目前只能使用这种方式来同时更新大量数据 err := BatchNamedExec(ctx, - "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy)"+ - " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy) as new"+ - " on duplicate key update Redundancy=new.Redundancy", 6, dummyObjs, nil) + "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, UpdateTime)"+ + " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :UpdateTime) as new"+ + " on duplicate key update Redundancy=new.Redundancy", 7, dummyObjs, nil) if err != nil { return fmt.Errorf("batch update object redundancy: %w", err) } diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index e831069..3d01e5d 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -1,6 +1,8 @@ package coordinator import ( + "time" + "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -92,10 +94,11 @@ type UpdatePackageResp struct { mq.MessageBodyBase } type AddObjectEntry struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - FileHash string `json:"fileHash"` - NodeID cdssdk.NodeID `json:"nodeID"` + Path string `json:"path"` + Size int64 `json:"size,string"` + FileHash string `json:"fileHash"` + UploadTime time.Time `json:"uploadTime"` // 开始上传文件的时间 + NodeID cdssdk.NodeID `json:"nodeID"` } func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { @@ -108,12 +111,13 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes func NewUpdatePackageResp() *UpdatePackageResp { return &UpdatePackageResp{} } -func NewAddObjectEntry(path string, size int64, fileHash string, nodeIDs cdssdk.NodeID) AddObjectEntry { +func NewAddObjectEntry(path string, size int64, fileHash string, uploadTime time.Time, nodeID cdssdk.NodeID) AddObjectEntry { return AddObjectEntry{ - Path: path, - Size: size, - FileHash: fileHash, - NodeID: nodeIDs, + Path: path, + Size: size, + FileHash: fileHash, + UploadTime: uploadTime, + NodeID: nodeID, } } func (client *Client) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, error) { diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index b1cc02c..9d82041 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -74,7 +74,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } // TODO UserID - getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(0)) + getNodes, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(1)) if err != nil { log.Warnf("getting all nodes: %s", err.Error()) return From 2166767aaed76de9263b2e63c5c8c2735b0e829d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 20 Mar 2024 09:48:45 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E9=97=B4=E5=BB=B6=E8=BF=9F=E7=9A=84=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/config/config.go | 2 + agent/internal/grpc/ping.go | 11 ++ agent/internal/task/create_package.go | 3 +- agent/internal/task/task.go | 13 +- agent/main.go | 42 ++++- client/internal/config/config.go | 14 +- client/internal/task/task.go | 9 +- client/internal/task/upload_objects.go | 3 +- client/main.go | 7 +- common/assets/confs/agent.config.json | 3 + common/assets/confs/client.config.json | 3 + common/assets/scripts/create_database.sql | 13 +- common/pkgs/cmd/upload_objects.go | 26 ++- common/pkgs/connectivity/collector.go | 198 ++++++++++++++++++++++ common/pkgs/connectivity/config.go | 5 + common/pkgs/db/model/model.go | 8 +- common/pkgs/db/node_connectivity.go | 32 ++++ common/pkgs/grpc/agent/agent.pb.go | 175 ++++++++++++++++--- common/pkgs/grpc/agent/agent.proto | 7 + common/pkgs/grpc/agent/agent_grpc.pb.go | 40 ++++- common/pkgs/grpc/agent/client.go | 8 + common/pkgs/mq/coordinator/node.go | 53 ++++++ coordinator/internal/mq/node.go | 49 ++++++ 23 files changed, 660 insertions(+), 64 deletions(-) create mode 100644 agent/internal/grpc/ping.go create mode 100644 common/pkgs/connectivity/collector.go create mode 100644 common/pkgs/connectivity/config.go create mode 100644 common/pkgs/db/node_connectivity.go diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index f8c04a9..141d5de 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -6,6 +6,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" c "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/grpc" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -19,6 +20,7 @@ type Config struct { RabbitMQ stgmq.Config `json:"rabbitMQ"` IPFS ipfs.Config `json:"ipfs"` DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` } var cfg Config diff --git a/agent/internal/grpc/ping.go b/agent/internal/grpc/ping.go new file mode 100644 index 0000000..ff0fd20 --- /dev/null +++ b/agent/internal/grpc/ping.go @@ -0,0 +1,11 @@ +package grpc + +import ( + "context" + + agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" +) + +func (s *Service) Ping(context.Context, *agtrpc.PingReq) (*agtrpc.PingResp, error) { + return &agtrpc.PingResp{}, nil +} diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 275d522..1ddd848 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -64,7 +64,8 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c } uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ - Distlock: ctx.distlock, + Distlock: ctx.distlock, + Connectivity: ctx.connectivity, }) if err != nil { err = fmt.Errorf("uploading objects: %w", err) diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index f41112c..469c0d0 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -3,12 +3,14 @@ package task import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type TaskContext struct { - distlock *distlock.Service - sw *ioswitch.Switch + distlock *distlock.Service + sw *ioswitch.Switch + connectivity *connectivity.Collector } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -23,9 +25,10 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, sw *ioswitch.Switch) Manager { +func NewManager(distlock *distlock.Service, sw *ioswitch.Switch, connectivity *connectivity.Collector) Manager { return task.NewManager(TaskContext{ - distlock: distlock, - sw: sw, + distlock: distlock, + sw: sw, + connectivity: connectivity, }) } diff --git a/agent/main.go b/agent/main.go index 1949f69..b1aa82b 100644 --- a/agent/main.go +++ b/agent/main.go @@ -7,9 +7,11 @@ import ( "sync" log "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/agent/internal/config" "gitlink.org.cn/cloudream/storage/agent/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" @@ -20,6 +22,7 @@ import ( "google.golang.org/grpc" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc" cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq" @@ -49,6 +52,41 @@ func main() { stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) stgglb.InitIPFSPool(&config.Cfg().IPFS) + // 启动网络连通性检测,并就地检测一次 + conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { + log := log.WithField("Connectivity", "") + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + log.Warnf("acquire coordinator mq failed, err: %s", err.Error()) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + cons := collector.GetAll() + nodeCons := make([]cdssdk.NodeConnectivity, 0, len(cons)) + for _, con := range cons { + var delay *float32 + if con.Delay != nil { + v := float32(con.Delay.Microseconds()) / 1000 + delay = &v + } + + nodeCons = append(nodeCons, cdssdk.NodeConnectivity{ + FromNodeID: *stgglb.Local.NodeID, + ToNodeID: con.ToNodeID, + Delay: delay, + TestTime: con.TestTime, + }) + } + + _, err = coorCli.UpdateNodeConnectivities(coormq.ReqUpdateNodeConnectivities(nodeCons)) + if err != nil { + log.Warnf("update node connectivities: %v", err) + } + }) + conCol.CollectInPlace() + distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { log.Fatalf("new ipfs failed, err: %s", err.Error()) @@ -60,7 +98,7 @@ func main() { wg := sync.WaitGroup{} wg.Add(4) - taskMgr := task.NewManager(distlock, &sw) + taskMgr := task.NewManager(distlock, &sw, &conCol) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 @@ -74,8 +112,6 @@ func main() { go serveAgentServer(agtSvr, &wg) - // go reportStatus(&wg) //网络延迟感知 - //面向客户端收发数据 listenAddr := config.Cfg().GRPC.MakeListenAddress() lis, err := net.Listen("tcp", listenAddr) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index c08d366..cf6d64a 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -6,17 +6,19 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) type Config struct { - Local stgmodels.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` - Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + Local stgmodels.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + Logger logger.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` } var cfg Config diff --git a/client/internal/task/task.go b/client/internal/task/task.go index 7f0a27a..a1ec9f6 100644 --- a/client/internal/task/task.go +++ b/client/internal/task/task.go @@ -3,10 +3,12 @@ package task import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" ) type TaskContext struct { - distlock *distlock.Service + distlock *distlock.Service + connectivity *connectivity.Collector } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -21,8 +23,9 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector) Manager { return task.NewManager(TaskContext{ - distlock: distlock, + distlock: distlock, + connectivity: connectivity, }) } diff --git a/client/internal/task/upload_objects.go b/client/internal/task/upload_objects.go index b138add..ca0f8a6 100644 --- a/client/internal/task/upload_objects.go +++ b/client/internal/task/upload_objects.go @@ -25,7 +25,8 @@ func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIt func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{ - Distlock: ctx.distlock, + Distlock: ctx.distlock, + Connectivity: ctx.connectivity, }) t.Result = ret diff --git a/client/main.go b/client/main.go index 9fe03d5..101b013 100644 --- a/client/main.go +++ b/client/main.go @@ -12,6 +12,7 @@ import ( "gitlink.org.cn/cloudream/storage/client/internal/services" "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" ) @@ -37,6 +38,10 @@ func main() { stgglb.InitIPFSPool(config.Cfg().IPFS) } + // 启动网络连通性检测,并就地检测一次 + conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) + conCol.CollectInPlace() + distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Warnf("new distlock service failed, err: %s", err.Error()) @@ -44,7 +49,7 @@ func main() { } go serveDistLock(distlockSvc) - taskMgr := task.NewManager(distlockSvc) + taskMgr := task.NewManager(distlockSvc, &conCol) svc, err := services.NewService(distlockSvc, &taskMgr) if err != nil { diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index 226711e..e54c953 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -32,5 +32,8 @@ "etcdLockLeaseTimeSec": 5, "randomReleasingDelayMs": 3000, "serviceDescription": "I am a agent" + }, + "connectivity": { + "testInterval": 300 } } \ No newline at end of file diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index d7ce483..9791f83 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -25,5 +25,8 @@ "etcdLockLeaseTimeSec": 5, "randomReleasingDelayMs": 3000, "serviceDescription": "I am a client" + }, + "connectivity": { + "testInterval": 300 } } \ No newline at end of file diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 415b0bf..42f3249 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -52,12 +52,13 @@ insert into values (1, "HuaWei-Cloud", 1, "/", "Online"); -create table NodeDelay ( - SourceNodeID int not null comment '发起检测的节点ID', - DestinationNodeID int not null comment '被检测节点的ID', - DelayInMs int not null comment '发起节点与被检测节点间延迟(毫秒)', - primary key(SourceNodeID, DestinationNodeID) -) comment = '节点延迟表'; +create table NodeConnectivity ( + FromNodeID int not null comment '发起检测的节点ID', + ToNodeID int not null comment '被检测节点的ID', + Delay float comment '发起节点与被检测节点间延迟(毫秒),为null代表节点不可达', + TestTime timestamp comment '进行连通性测试的时间', + primary key(FromNodeID, ToNodeID) +) comment = '节点连通性表'; create table User ( UserID int not null primary key comment '用户ID', diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index b4234f3..59a66bc 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "io" + "math" "math/rand" "time" @@ -11,8 +12,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sort2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -39,11 +42,13 @@ type ObjectUploadResult struct { type UploadNodeInfo struct { Node cdssdk.Node + Delay time.Duration IsSameLocation bool } type UploadObjectsContext struct { - Distlock *distlock.Service + Distlock *distlock.Service + Connectivity *connectivity.Collector } func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { @@ -68,12 +73,24 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult return nil, fmt.Errorf("getting user nodes: %w", err) } + cons := ctx.Connectivity.GetAll() userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo { + delay := time.Duration(math.MaxInt64) + + con, ok := cons[node.NodeID] + if ok && con.Delay != nil { + delay = *con.Delay + } + return UploadNodeInfo{ Node: node, + Delay: delay, IsSameLocation: node.LocationID == stgglb.Local.LocationID, } }) + if len(userNodes) == 0 { + return nil, fmt.Errorf("user no available nodes") + } // 给上传节点的IPFS加锁 ipfsReqBlder := reqbuilder.NewBuilder() @@ -109,7 +126,7 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult // chooseUploadNode 选择一个上传文件的节点 // 1. 选择设置了亲和性的节点 // 2. 从与当前客户端相同地域的节点中随机选一个 -// 3. 没有用的话从所有节点中随机选一个 +// 3. 没有的话从所有节点选择延迟最低的节点 func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) UploadNodeInfo { if nodeAffinity != nil { aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity }) @@ -123,7 +140,10 @@ func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) Uploa return sameLocationNodes[rand.Intn(len(sameLocationNodes))] } - return nodes[rand.Intn(len(nodes))] + // 选择延迟最低的节点 + nodes = sort2.Sort(nodes, func(e1, e2 UploadNodeInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) + + return nodes[0] } func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) { diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go new file mode 100644 index 0000000..20894d7 --- /dev/null +++ b/common/pkgs/connectivity/collector.go @@ -0,0 +1,198 @@ +package connectivity + +import ( + "math/rand" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type Connectivity struct { + ToNodeID cdssdk.NodeID + Delay *time.Duration + TestTime time.Time +} + +type Collector struct { + cfg *Config + onCollected func(collector *Collector) + collectNow chan any + close chan any + connectivities map[cdssdk.NodeID]Connectivity + lock *sync.RWMutex +} + +func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector { + rpt := Collector{ + cfg: cfg, + collectNow: make(chan any), + close: make(chan any), + connectivities: make(map[cdssdk.NodeID]Connectivity), + lock: &sync.RWMutex{}, + onCollected: onCollected, + } + go rpt.serve() + return rpt +} + +func (r *Collector) Get(nodeID cdssdk.NodeID) *Connectivity { + r.lock.RLock() + defer r.lock.RUnlock() + + con, ok := r.connectivities[nodeID] + if ok { + return &con + } + + return nil +} +func (r *Collector) GetAll() map[cdssdk.NodeID]Connectivity { + r.lock.RLock() + defer r.lock.RUnlock() + + ret := make(map[cdssdk.NodeID]Connectivity) + for k, v := range r.connectivities { + ret[k] = v + } + + return ret +} + +// 启动一次收集 +func (r *Collector) CollecNow() { + select { + case r.collectNow <- nil: + default: + } +} + +// 就地进行收集,会阻塞当前线程 +func (r *Collector) CollectInPlace() { + r.testing() +} + +func (r *Collector) Close() { + select { + case r.close <- nil: + default: + } +} + +func (r *Collector) serve() { + log := logger.WithType[Collector]("") + log.Info("start connectivity reporter") + + // 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机 + startup := true + firstReportDelay := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64()) + ticker := time.NewTicker(firstReportDelay) + +loop: + for { + select { + case <-ticker.C: + r.testing() + if startup { + startup = false + ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second) + } + + case <-r.collectNow: + r.testing() + + case <-r.close: + ticker.Stop() + break loop + } + } + + log.Info("stop connectivity reporter") +} + +func (r *Collector) testing() { + log := logger.WithType[Collector]("") + log.Debug("do testing") + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getNodeResp, err := coorCli.GetNodes(coormq.NewGetNodes(nil)) + if err != nil { + return + } + + wg := sync.WaitGroup{} + cons := make([]Connectivity, len(getNodeResp.Nodes)) + for i, node := range getNodeResp.Nodes { + tmpIdx := i + tmpNode := node + + wg.Add(1) + go func() { + defer wg.Done() + cons[tmpIdx] = r.ping(tmpNode) + }() + } + + wg.Wait() + + r.lock.Lock() + // 删除所有node的记录,然后重建,避免node数量变化时导致残余数据 + r.connectivities = make(map[cdssdk.NodeID]Connectivity) + for _, con := range cons { + r.connectivities[con.ToNodeID] = con + } + r.lock.Unlock() + + if r.onCollected != nil { + r.onCollected(r) + } +} + +func (r *Collector) ping(node cdssdk.Node) Connectivity { + log := logger.WithType[Collector]("").WithField("NodeID", node.NodeID) + + ip := node.ExternalIP + port := node.ExternalGRPCPort + if node.LocationID == stgglb.Local.LocationID { + ip = node.LocalIP + port = node.LocalGRPCPort + } + + agtCli, err := stgglb.AgentRPCPool.Acquire(ip, port) + if err != nil { + log.Warnf("new agent %v:%v rpc client: %w", ip, port, err) + return Connectivity{ + ToNodeID: node.NodeID, + Delay: nil, + TestTime: time.Now(), + } + } + defer stgglb.AgentRPCPool.Release(agtCli) + + start := time.Now() + err = agtCli.Ping(*stgglb.Local.NodeID) + if err != nil { + log.Warnf("ping: %v", err) + return Connectivity{ + ToNodeID: node.NodeID, + Delay: nil, + TestTime: time.Now(), + } + } + + // 此时间差为一个来回的时间,因此单程延迟需要除以2 + delay := time.Since(start) / 2 + return Connectivity{ + ToNodeID: node.NodeID, + Delay: &delay, + TestTime: time.Now(), + } +} diff --git a/common/pkgs/connectivity/config.go b/common/pkgs/connectivity/config.go new file mode 100644 index 0000000..c839fcb --- /dev/null +++ b/common/pkgs/connectivity/config.go @@ -0,0 +1,5 @@ +package connectivity + +type Config struct { + TestInterval int `json:"testInterval"` // 进行测试的间隔 +} diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 7b5ee66..a48946f 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -20,12 +20,6 @@ type Storage struct { State string `db:"State" json:"state"` } -type NodeDelay struct { - SourceNodeID int64 `db:"SourceNodeID"` - DestinationNodeID int64 `db:"DestinationNodeID"` - DelayInMs int `db:"DelayInMs"` -} - type User struct { UserID cdssdk.UserID `db:"UserID" json:"userID"` Password string `db:"PassWord" json:"password"` @@ -52,6 +46,8 @@ type Package = cdssdk.Package type Object = cdssdk.Object +type NodeConnectivity = cdssdk.NodeConnectivity + // 由于Object的Redundancy字段是interface,所以不能直接将查询结果scan成Object,必须先scan成TempObject, // 再.ToObject()转成Object type TempObject struct { diff --git a/common/pkgs/db/node_connectivity.go b/common/pkgs/db/node_connectivity.go new file mode 100644 index 0000000..ffc3307 --- /dev/null +++ b/common/pkgs/db/node_connectivity.go @@ -0,0 +1,32 @@ +package db + +import ( + "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" +) + +type NodeConnectivityDB struct { + *DB +} + +func (db *DB) NodeConnectivity() *NodeConnectivityDB { + return &NodeConnectivityDB{DB: db} +} + +func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) { + var ret []model.NodeConnectivity + + sql, args, err := sqlx.In("select * from NodeConnectivity where NodeID in (?)", nodeIDs) + if err != nil { + return nil, err + } + + return ret, sqlx.Select(ctx, &ret, sql, args...) +} + +func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.NodeConnectivity) error { + return BatchNamedExec(ctx, + "insert into NodeConnectivity(FromNodeID, ToNodeID, Delay, TestTime) values(:FromNodeID, :ToNodeID, :Delay, :TestTime) as new"+ + " on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil) +} diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index c5f933b..1e70002 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -386,6 +386,91 @@ func (x *FetchStreamReq) GetStreamID() string { return "" } +type PingReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FromNodeID int64 `protobuf:"varint,1,opt,name=FromNodeID,proto3" json:"FromNodeID,omitempty"` +} + +func (x *PingReq) Reset() { + *x = PingReq{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingReq) ProtoMessage() {} + +func (x *PingReq) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingReq.ProtoReflect.Descriptor instead. +func (*PingReq) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{6} +} + +func (x *PingReq) GetFromNodeID() int64 { + if x != nil { + return x.FromNodeID + } + return 0 +} + +type PingResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *PingResp) Reset() { + *x = PingResp{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingResp) ProtoMessage() {} + +func (x *PingResp) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingResp.ProtoReflect.Descriptor instead. +func (*PingResp) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{7} +} + var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ @@ -415,26 +500,32 @@ var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x2a, 0x37, 0x0a, 0x14, - 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, - 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, - 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0xe1, 0x01, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, - 0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, - 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, - 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, - 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, - 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, - 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a, - 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, - 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, - 0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, - 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, - 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x22, 0x29, 0x0a, 0x07, + 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72, 0x6f, 0x6d, 0x4e, + 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x46, 0x72, 0x6f, + 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x73, 0x70, 0x2a, 0x37, 0x0a, 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, + 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, + 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, + 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0x80, 0x02, 0x0a, + 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, + 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, + 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, + 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, + 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, + 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, + 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, + 0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, + 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74, + 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, + 0x12, 0x1d, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x71, 0x1a, 0x09, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x42, + 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -450,7 +541,7 @@ func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte { } var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ (StreamDataPacketType)(0), // 0: StreamDataPacketType (*FileDataPacket)(nil), // 1: FileDataPacket @@ -459,6 +550,8 @@ var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ (*StreamDataPacket)(nil), // 4: StreamDataPacket (*SendStreamResp)(nil), // 5: SendStreamResp (*FetchStreamReq)(nil), // 6: FetchStreamReq + (*PingReq)(nil), // 7: PingReq + (*PingResp)(nil), // 8: PingResp } var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ 0, // 0: FileDataPacket.Type:type_name -> StreamDataPacketType @@ -467,12 +560,14 @@ var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ 3, // 3: Agent.GetIPFSFile:input_type -> GetIPFSFileReq 4, // 4: Agent.SendStream:input_type -> StreamDataPacket 6, // 5: Agent.FetchStream:input_type -> FetchStreamReq - 2, // 6: Agent.SendIPFSFile:output_type -> SendIPFSFileResp - 1, // 7: Agent.GetIPFSFile:output_type -> FileDataPacket - 5, // 8: Agent.SendStream:output_type -> SendStreamResp - 4, // 9: Agent.FetchStream:output_type -> StreamDataPacket - 6, // [6:10] is the sub-list for method output_type - 2, // [2:6] is the sub-list for method input_type + 7, // 6: Agent.Ping:input_type -> PingReq + 2, // 7: Agent.SendIPFSFile:output_type -> SendIPFSFileResp + 1, // 8: Agent.GetIPFSFile:output_type -> FileDataPacket + 5, // 9: Agent.SendStream:output_type -> SendStreamResp + 4, // 10: Agent.FetchStream:output_type -> StreamDataPacket + 8, // 11: Agent.Ping:output_type -> PingResp + 7, // [7:12] is the sub-list for method output_type + 2, // [2:7] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name 2, // [2:2] is the sub-list for extension extendee 0, // [0:2] is the sub-list for field type_name @@ -556,6 +651,30 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } + file_pkgs_grpc_agent_agent_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -563,7 +682,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc, NumEnums: 1, - NumMessages: 6, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index f756fec..7f08bde 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -40,11 +40,18 @@ message FetchStreamReq { string StreamID = 2; } +message PingReq { + int64 FromNodeID = 1; +} +message PingResp {} + service Agent { rpc SendIPFSFile(stream FileDataPacket)returns(SendIPFSFileResp){} rpc GetIPFSFile(GetIPFSFileReq)returns(stream FileDataPacket){} rpc SendStream(stream StreamDataPacket)returns(SendStreamResp){} rpc FetchStream(FetchStreamReq)returns(stream StreamDataPacket){} + + rpc Ping(PingReq) returns(PingResp){} } diff --git a/common/pkgs/grpc/agent/agent_grpc.pb.go b/common/pkgs/grpc/agent/agent_grpc.pb.go index 95b2f92..81d5910 100644 --- a/common/pkgs/grpc/agent/agent_grpc.pb.go +++ b/common/pkgs/grpc/agent/agent_grpc.pb.go @@ -25,6 +25,7 @@ const ( Agent_GetIPFSFile_FullMethodName = "/Agent/GetIPFSFile" Agent_SendStream_FullMethodName = "/Agent/SendStream" Agent_FetchStream_FullMethodName = "/Agent/FetchStream" + Agent_Ping_FullMethodName = "/Agent/Ping" ) // AgentClient is the client API for Agent service. @@ -35,6 +36,7 @@ type AgentClient interface { GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error) SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) FetchStream(ctx context.Context, in *FetchStreamReq, opts ...grpc.CallOption) (Agent_FetchStreamClient, error) + Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) } type agentClient struct { @@ -177,6 +179,15 @@ func (x *agentFetchStreamClient) Recv() (*StreamDataPacket, error) { return m, nil } +func (c *agentClient) Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) { + out := new(PingResp) + err := c.cc.Invoke(ctx, Agent_Ping_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // AgentServer is the server API for Agent service. // All implementations must embed UnimplementedAgentServer // for forward compatibility @@ -185,6 +196,7 @@ type AgentServer interface { GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error SendStream(Agent_SendStreamServer) error FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error + Ping(context.Context, *PingReq) (*PingResp, error) mustEmbedUnimplementedAgentServer() } @@ -204,6 +216,9 @@ func (UnimplementedAgentServer) SendStream(Agent_SendStreamServer) error { func (UnimplementedAgentServer) FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error { return status.Errorf(codes.Unimplemented, "method FetchStream not implemented") } +func (UnimplementedAgentServer) Ping(context.Context, *PingReq) (*PingResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {} // UnsafeAgentServer may be embedded to opt out of forward compatibility for this service. @@ -311,13 +326,36 @@ func (x *agentFetchStreamServer) Send(m *StreamDataPacket) error { return x.ServerStream.SendMsg(m) } +func _Agent_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AgentServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Agent_Ping_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AgentServer).Ping(ctx, req.(*PingReq)) + } + return interceptor(ctx, in, info, handler) +} + // Agent_ServiceDesc is the grpc.ServiceDesc for Agent service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var Agent_ServiceDesc = grpc.ServiceDesc{ ServiceName: "Agent", HandlerType: (*AgentServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "Ping", + Handler: _Agent_Ping_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "SendIPFSFile", diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index 440e49a..74ea749 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -5,6 +5,7 @@ import ( "fmt" "io" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -209,6 +210,13 @@ func (c *Client) FetchStream(planID ioswitch.PlanID, streamID ioswitch.StreamID) }, nil } +func (c *Client) Ping(fromNodeID cdssdk.NodeID) error { + _, err := c.cli.Ping(context.Background(), &PingReq{ + FromNodeID: int64(fromNodeID), + }) + return err +} + func (c *Client) Close() { c.con.Close() } diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/node.go index 558949a..53c0ddd 100644 --- a/common/pkgs/mq/coordinator/node.go +++ b/common/pkgs/mq/coordinator/node.go @@ -9,6 +9,10 @@ type NodeService interface { GetUserNodes(msg *GetUserNodes) (*GetUserNodesResp, *mq.CodeMessage) GetNodes(msg *GetNodes) (*GetNodesResp, *mq.CodeMessage) + + GetNodeConnectivities(msg *GetNodeConnectivities) (*GetNodeConnectivitiesResp, *mq.CodeMessage) + + UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, *mq.CodeMessage) } // 查询用户可用的节点 @@ -71,3 +75,52 @@ func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *cdssdk.Node { func (client *Client) GetNodes(msg *GetNodes) (*GetNodesResp, error) { return mq.Request(Service.GetNodes, client.rabbitCli, msg) } + +// 获取节点连通性信息 +var _ = Register(Service.GetNodeConnectivities) + +type GetNodeConnectivities struct { + mq.MessageBodyBase + NodeIDs []cdssdk.NodeID `json:"nodeIDs"` +} +type GetNodeConnectivitiesResp struct { + mq.MessageBodyBase + Connectivities []cdssdk.NodeConnectivity `json:"nodes"` +} + +func ReqGetNodeConnectivities(nodeIDs []cdssdk.NodeID) *GetNodeConnectivities { + return &GetNodeConnectivities{ + NodeIDs: nodeIDs, + } +} +func RespGetNodeConnectivities(cons []cdssdk.NodeConnectivity) *GetNodeConnectivitiesResp { + return &GetNodeConnectivitiesResp{ + Connectivities: cons, + } +} +func (client *Client) GetNodeConnectivities(msg *GetNodeConnectivities) (*GetNodeConnectivitiesResp, error) { + return mq.Request(Service.GetNodeConnectivities, client.rabbitCli, msg) +} + +// 批量更新节点连通性信息 +var _ = Register(Service.UpdateNodeConnectivities) + +type UpdateNodeConnectivities struct { + mq.MessageBodyBase + Connectivities []cdssdk.NodeConnectivity `json:"connectivities"` +} +type UpdateNodeConnectivitiesResp struct { + mq.MessageBodyBase +} + +func ReqUpdateNodeConnectivities(cons []cdssdk.NodeConnectivity) *UpdateNodeConnectivities { + return &UpdateNodeConnectivities{ + Connectivities: cons, + } +} +func RespUpdateNodeConnectivities() *UpdateNodeConnectivitiesResp { + return &UpdateNodeConnectivitiesResp{} +} +func (client *Client) UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, error) { + return mq.Request(Service.UpdateNodeConnectivities, client.rabbitCli, msg) +} diff --git a/coordinator/internal/mq/node.go b/coordinator/internal/mq/node.go index 778c956..0b51245 100644 --- a/coordinator/internal/mq/node.go +++ b/coordinator/internal/mq/node.go @@ -1,6 +1,10 @@ package mq import ( + "database/sql" + "fmt" + + "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" @@ -46,3 +50,48 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co return mq.ReplyOK(coormq.NewGetNodesResp(nodes)) } + +func (svc *Service) GetNodeConnectivities(msg *coormq.GetNodeConnectivities) (*coormq.GetNodeConnectivitiesResp, *mq.CodeMessage) { + cons, err := svc.db.NodeConnectivity().BatchGetByFromNode(svc.db.SQLCtx(), msg.NodeIDs) + if err != nil { + logger.Warnf("batch get node connectivities by from node: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch get node connectivities by from node failed") + } + + return mq.ReplyOK(coormq.RespGetNodeConnectivities(cons)) +} + +func (svc *Service) UpdateNodeConnectivities(msg *coormq.UpdateNodeConnectivities) (*coormq.UpdateNodeConnectivitiesResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + // 只有发起节点和目的节点都存在,才能插入这条记录到数据库 + allNodes, err := svc.db.Node().GetAllNodes(tx) + if err != nil { + return fmt.Errorf("getting all nodes: %w", err) + } + + allNodeID := make(map[cdssdk.NodeID]bool) + for _, node := range allNodes { + allNodeID[node.NodeID] = true + } + + var avaiCons []cdssdk.NodeConnectivity + for _, con := range msg.Connectivities { + if allNodeID[con.FromNodeID] && allNodeID[con.ToNodeID] { + avaiCons = append(avaiCons, con) + } + } + + err = svc.db.NodeConnectivity().BatchUpdateOrCreate(tx, avaiCons) + if err != nil { + return fmt.Errorf("batch update or create node connectivities: %s", err) + } + + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + return mq.ReplyOK(coormq.RespUpdateNodeConnectivities()) +} From 3c4696ee427049af99db4dc088ea9e2d2ad41f3b Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 20 Mar 2024 10:35:02 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E4=BC=98=E5=8C=96ping=E8=BF=87=E7=A8=8B?= =?UTF-8?q?=E7=9A=84=E5=BB=B6=E8=BF=9F=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/connectivity/collector.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index 20894d7..a6c4de5 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -177,6 +177,18 @@ func (r *Collector) ping(node cdssdk.Node) Connectivity { } defer stgglb.AgentRPCPool.Release(agtCli) + // 第一次ping保证网络连接建立成功 + err = agtCli.Ping(*stgglb.Local.NodeID) + if err != nil { + log.Warnf("pre ping: %v", err) + return Connectivity{ + ToNodeID: node.NodeID, + Delay: nil, + TestTime: time.Now(), + } + } + + // 第二次ping计算延迟 start := time.Now() err = agtCli.Ping(*stgglb.Local.NodeID) if err != nil { From 01794c2cf273d674a777f43ec42cd8a5ca1ff812 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 20 Mar 2024 10:57:04 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E4=BC=98=E5=8C=96ping=E8=BF=87=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/connectivity/collector.go | 31 +++++++++++++++++---------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index a6c4de5..e8c14fc 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -188,20 +188,29 @@ func (r *Collector) ping(node cdssdk.Node) Connectivity { } } - // 第二次ping计算延迟 - start := time.Now() - err = agtCli.Ping(*stgglb.Local.NodeID) - if err != nil { - log.Warnf("ping: %v", err) - return Connectivity{ - ToNodeID: node.NodeID, - Delay: nil, - TestTime: time.Now(), + // 后几次ping计算延迟 + var avgDelay time.Duration + for i := 0; i < 3; i++ { + start := time.Now() + err = agtCli.Ping(*stgglb.Local.NodeID) + if err != nil { + log.Warnf("ping: %v", err) + return Connectivity{ + ToNodeID: node.NodeID, + Delay: nil, + TestTime: time.Now(), + } } + + // 此时间差为一个来回的时间,因此单程延迟需要除以2 + delay := time.Since(start) / 2 + avgDelay += delay + + // 每次ping之间间隔1秒 + <-time.After(time.Second) } + delay := avgDelay / 3 - // 此时间差为一个来回的时间,因此单程延迟需要除以2 - delay := time.Since(start) / 2 return Connectivity{ ToNodeID: node.NodeID, Delay: &delay, From ecab2e0bc9dce149701557d156ce1a31ae4d27fe Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 21 Mar 2024 09:57:56 +0800 Subject: [PATCH 6/9] =?UTF-8?q?ping=E6=8E=A5=E5=8F=A3=E4=B8=8D=E9=9C=80?= =?UTF-8?q?=E8=A6=81=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/connectivity/collector.go | 4 ++-- common/pkgs/grpc/agent/agent.pb.go | 15 ++------------- common/pkgs/grpc/agent/agent.proto | 4 +--- common/pkgs/grpc/agent/client.go | 7 ++----- 4 files changed, 7 insertions(+), 23 deletions(-) diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index e8c14fc..6234cd9 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -178,7 +178,7 @@ func (r *Collector) ping(node cdssdk.Node) Connectivity { defer stgglb.AgentRPCPool.Release(agtCli) // 第一次ping保证网络连接建立成功 - err = agtCli.Ping(*stgglb.Local.NodeID) + err = agtCli.Ping() if err != nil { log.Warnf("pre ping: %v", err) return Connectivity{ @@ -192,7 +192,7 @@ func (r *Collector) ping(node cdssdk.Node) Connectivity { var avgDelay time.Duration for i := 0; i < 3; i++ { start := time.Now() - err = agtCli.Ping(*stgglb.Local.NodeID) + err = agtCli.Ping() if err != nil { log.Warnf("ping: %v", err) return Connectivity{ diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index 1e70002..4237f4f 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -390,8 +390,6 @@ type PingReq struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - - FromNodeID int64 `protobuf:"varint,1,opt,name=FromNodeID,proto3" json:"FromNodeID,omitempty"` } func (x *PingReq) Reset() { @@ -426,13 +424,6 @@ func (*PingReq) Descriptor() ([]byte, []int) { return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{6} } -func (x *PingReq) GetFromNodeID() int64 { - if x != nil { - return x.FromNodeID - } - return 0 -} - type PingResp struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -500,10 +491,8 @@ var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x22, 0x29, 0x0a, 0x07, - 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72, 0x6f, 0x6d, 0x4e, - 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x46, 0x72, 0x6f, - 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x22, 0x09, 0x0a, 0x07, + 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x2a, 0x37, 0x0a, 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index 7f08bde..696d30a 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -40,9 +40,7 @@ message FetchStreamReq { string StreamID = 2; } -message PingReq { - int64 FromNodeID = 1; -} +message PingReq {} message PingResp {} service Agent { diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index 74ea749..8b570c4 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -210,10 +209,8 @@ func (c *Client) FetchStream(planID ioswitch.PlanID, streamID ioswitch.StreamID) }, nil } -func (c *Client) Ping(fromNodeID cdssdk.NodeID) error { - _, err := c.cli.Ping(context.Background(), &PingReq{ - FromNodeID: int64(fromNodeID), - }) +func (c *Client) Ping() error { + _, err := c.cli.Ping(context.Background(), &PingReq{}) return err } From c95adff0dd48d5426bb6d8da343ae8aea03ebf2d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 21 Mar 2024 10:15:57 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/cache.go | 5 +++++ client/internal/cmdline/object.go | 5 +++++ client/internal/cmdline/package.go | 6 ++++++ client/internal/cmdline/storage.go | 10 ++++++++++ 4 files changed, 26 insertions(+) diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index 08ab896..ac32e92 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -8,6 +8,11 @@ import ( ) func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + startTime := time.Now() + defer func() { + fmt.Printf("%v\n", time.Since(startTime).Seconds()) + }() + taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(1, packageID, nodeID) if err != nil { return fmt.Errorf("start cache moving package: %w", err) diff --git a/client/internal/cmdline/object.go b/client/internal/cmdline/object.go index fad17cf..8c36a58 100644 --- a/client/internal/cmdline/object.go +++ b/client/internal/cmdline/object.go @@ -11,6 +11,11 @@ import ( ) var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath string, nodeAffinity []cdssdk.NodeID) error { + startTime := time.Now() + defer func() { + fmt.Printf("%v\n", time.Since(startTime).Seconds()) + }() + userID := cdssdk.UserID(1) var uploadFilePathes []string diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index a15bdbe..eb980fe 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -5,6 +5,7 @@ import ( "io" "os" "path/filepath" + "time" "github.com/jedib0t/go-pretty/v6/table" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -33,6 +34,11 @@ func PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID) err } func PackageDownloadPackage(ctx CommandContext, packageID cdssdk.PackageID, outputDir string) error { + startTime := time.Now() + defer func() { + fmt.Printf("%v\n", time.Since(startTime).Seconds()) + }() + userID := cdssdk.UserID(1) err := os.MkdirAll(outputDir, os.ModePerm) diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index e47665c..e2c65a6 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -8,6 +8,11 @@ import ( ) func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error { + startTime := time.Now() + defer func() { + fmt.Printf("%v\n", time.Since(startTime).Seconds()) + }() + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(1, packageID, storageID) if err != nil { return fmt.Errorf("start loading package to storage: %w", err) @@ -31,6 +36,11 @@ func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageI } func StorageCreatePackage(ctx CommandContext, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string) error { + startTime := time.Now() + defer func() { + fmt.Printf("%v\n", time.Since(startTime).Seconds()) + }() + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageCreatePackage(1, bucketID, name, storageID, path, nil) if err != nil { return fmt.Errorf("start storage uploading package: %w", err) From 9a19e063f8b04d973a4fade3ccc9e26ecc96a3b8 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 21 Mar 2024 10:36:30 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E6=89=80=E6=9C=89=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E6=89=A7=E8=A1=8Csql=E8=AF=AD=E5=8F=A5=E7=9A=84=E5=87=BD?= =?UTF-8?q?=E6=95=B0=E9=9C=80=E8=A6=81=E5=88=A4=E6=96=AD=E5=8F=82=E6=95=B0?= =?UTF-8?q?=E9=95=BF=E5=BA=A6=E6=98=AF=E5=90=A6=E4=B8=BA0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/db/cache.go | 11 ++++++++++ common/pkgs/db/node_connectivity.go | 8 ++++++++ common/pkgs/db/object.go | 31 ++++++++++++++++++++++++----- common/pkgs/db/object_block.go | 12 +++++++++++ common/pkgs/db/pinned_object.go | 18 ++++++++++++++++- common/pkgs/db/utils.go | 4 ++-- 6 files changed, 76 insertions(+), 8 deletions(-) diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index 84ff1ca..4979228 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -46,6 +46,9 @@ func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, pr // 批量创建缓存记录 func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { + if len(caches) == 0 { + return nil + } return BatchNamedExec( ctx, "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ @@ -57,6 +60,10 @@ func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { } func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { + if len(fileHashes) == 0 { + return nil + } + var caches []model.Cache var nowTime = time.Now() for _, hash := range fileHashes { @@ -78,6 +85,10 @@ func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeI } func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { + if len(fileHashes) == 0 { + return nil + } + // TODO in语句有长度限制 query, args, err := sqlx.In("delete from Cache where NodeID = ? and FileHash in (?)", nodeID, fileHashes) if err != nil { diff --git a/common/pkgs/db/node_connectivity.go b/common/pkgs/db/node_connectivity.go index ffc3307..8461563 100644 --- a/common/pkgs/db/node_connectivity.go +++ b/common/pkgs/db/node_connectivity.go @@ -15,6 +15,10 @@ func (db *DB) NodeConnectivity() *NodeConnectivityDB { } func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) { + if len(nodeIDs) == 0 { + return nil, nil + } + var ret []model.NodeConnectivity sql, args, err := sqlx.In("select * from NodeConnectivity where NodeID in (?)", nodeIDs) @@ -26,6 +30,10 @@ func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssd } func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.NodeConnectivity) error { + if len(cons) == 0 { + return nil + } + return BatchNamedExec(ctx, "insert into NodeConnectivity(FromNodeID, ToNodeID, Delay, TestTime) values(:FromNodeID, :ToNodeID, :Delay, :TestTime) as new"+ " on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil) diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 20c1afb..3655c98 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -27,6 +27,10 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj } func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) { + if len(pathes) == 0 { + return nil, nil + } + // TODO In语句 stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) if err != nil { @@ -63,6 +67,10 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, // 用于创建时,需要额外检查PackageID+Path的唯一性 // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { + if len(objs) == 0 { + return nil + } + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + " values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + " on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" @@ -129,6 +137,10 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac } func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { + if len(adds) == 0 { + return nil, nil + } + objs := make([]cdssdk.Object, 0, len(adds)) for _, add := range adds { objs = append(objs, cdssdk.Object{ @@ -175,7 +187,6 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] FileHash: add.FileHash, }) } - err = db.ObjectBlock().BatchCreate(ctx, objBlocks) if err != nil { return nil, fmt.Errorf("batch create object blocks: %w", err) @@ -190,7 +201,6 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] Priority: 0, }) } - err = db.Cache().BatchCreate(ctx, caches) if err != nil { return nil, fmt.Errorf("batch create caches: %w", err) @@ -200,6 +210,11 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] } func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { + if len(objs) == 0 { + return nil + } + + nowTime := time.Now() objIDs := make([]cdssdk.ObjectID, 0, len(objs)) dummyObjs := make([]cdssdk.Object, 0, len(objs)) for _, obj := range objs { @@ -207,14 +222,16 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb dummyObjs = append(dummyObjs, cdssdk.Object{ ObjectID: obj.ObjectID, Redundancy: obj.Redundancy, + CreateTime: nowTime, + UpdateTime: nowTime, }) } // 目前只能使用这种方式来同时更新大量数据 err := BatchNamedExec(ctx, - "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, UpdateTime)"+ - " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :UpdateTime) as new"+ - " on duplicate key update Redundancy=new.Redundancy", 7, dummyObjs, nil) + "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime)"+ + " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :CreateTime, :UpdateTime) as new"+ + " on duplicate key update Redundancy=new.Redundancy", 8, dummyObjs, nil) if err != nil { return fmt.Errorf("batch update object redundancy: %w", err) } @@ -275,6 +292,10 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb } func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { + if len(ids) == 0 { + return nil + } + query, args, err := sqlx.In("delete from Object where ObjectID in (?)", ids) if err != nil { return err diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index f420bfb..a0638e8 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -30,6 +30,10 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index } func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { + if len(blocks) == 0 { + return nil + } + return BatchNamedExec(ctx, "insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)", 4, @@ -44,6 +48,10 @@ func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.Object } func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + if len(objectIDs) == 0 { + return nil + } + // TODO in语句有长度限制 query, args, err := sqlx.In("delete from ObjectBlock where ObjectID in (?)", objectIDs) if err != nil { @@ -59,6 +67,10 @@ func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.Packag } func (db *ObjectBlockDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { + if len(fileHashes) == 0 { + return nil + } + query, args, err := sqlx.In("delete from ObjectBlock where NodeID = ? and FileHash in (?)", nodeID, fileHashes) if err != nil { return err diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 6853315..d3c7d7b 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -40,6 +40,10 @@ func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID } func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { + if len(pinneds) == 0 { + return nil + } + return BatchNamedExec(ctx, "insert ignore into PinnedObject values(:NodeID,:ObjectID,:CreateTime)", 3, pinneds, nil) } @@ -54,6 +58,10 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag } func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, nodeIDs []cdssdk.NodeID) error { + if len(nodeIDs) == 0 { + return nil + } + for _, id := range nodeIDs { err := db.TryCreate(ctx, id, objectID, time.Now()) if err != nil { @@ -74,6 +82,10 @@ func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID } func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + if len(objectIDs) == 0 { + return nil + } + // TODO in语句有长度限制 query, args, err := sqlx.In("delete from PinnedObject where ObjectID in (?)", objectIDs) if err != nil { @@ -94,7 +106,11 @@ func (*PinnedObjectDB) DeleteInPackageAtNode(ctx SQLContext, packageID cdssdk.Pa } func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { - query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) + if len(objectIDs) == 0 { + return nil + } + + query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", nodeID, objectIDs) if err != nil { return err } diff --git a/common/pkgs/db/utils.go b/common/pkgs/db/utils.go index 5dde56f..2614355 100644 --- a/common/pkgs/db/utils.go +++ b/common/pkgs/db/utils.go @@ -31,7 +31,7 @@ func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, call ret, err := ctx.NamedExec(sql, arr[:curBatchSize]) if err != nil { - return nil + return err } if callback != nil && !callback(ret) { return nil @@ -63,7 +63,7 @@ func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, cal ret, err := ctx.NamedQuery(sql, arr[:curBatchSize]) if err != nil { - return nil + return err } if callback != nil && !callback(ret) { return nil From ec3e9b0a35e0fe99006d5b91ef77f234b521670a Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 25 Mar 2024 11:31:03 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E7=BB=99scanner=E7=9A=84=E9=83=A8=E5=88=86?= =?UTF-8?q?event=E5=A2=9E=E5=8A=A0=E6=89=A7=E8=A1=8C=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scanner/internal/event/agent_cache_gc.go | 5 ++++- scanner/internal/event/agent_check_cache.go | 6 ++++-- scanner/internal/event/agent_storage_gc.go | 5 ++++- scanner/internal/event/check_package_redundancy.go | 5 ++++- scanner/internal/event/clean_pinned.go | 6 +++++- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/scanner/internal/event/agent_cache_gc.go b/scanner/internal/event/agent_cache_gc.go index 22e5521..f37bff8 100644 --- a/scanner/internal/event/agent_cache_gc.go +++ b/scanner/internal/event/agent_cache_gc.go @@ -40,8 +40,11 @@ func (t *AgentCacheGC) TryMerge(other Event) bool { func (t *AgentCacheGC) Execute(execCtx ExecuteContext) { log := logger.WithType[AgentCacheGC]("Event") + startTime := time.Now() log.Debugf("begin with %v", logger.FormatStruct(t.AgentCacheGC)) - defer log.Debugf("end") + defer func() { + log.Debugf("end, time: %v", time.Since(startTime)) + }() // TODO unavailable的节点需不需要发送任务? diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_cache.go index 7f4cd45..17bc18f 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_cache.go @@ -40,9 +40,11 @@ func (t *AgentCheckCache) TryMerge(other Event) bool { func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { log := logger.WithType[AgentCheckCache]("Event") + startTime := time.Now() log.Debugf("begin with %v", logger.FormatStruct(t.AgentCheckCache)) - defer log.Debugf("end") - + defer func() { + log.Debugf("end, time: %v", time.Since(startTime)) + }() // TODO unavailable的节点需不需要发送任务? agtCli, err := stgglb.AgentMQPool.Acquire(t.NodeID) diff --git a/scanner/internal/event/agent_storage_gc.go b/scanner/internal/event/agent_storage_gc.go index c89e9b8..c3c22ad 100644 --- a/scanner/internal/event/agent_storage_gc.go +++ b/scanner/internal/event/agent_storage_gc.go @@ -37,8 +37,11 @@ func (t *AgentStorageGC) TryMerge(other Event) bool { func (t *AgentStorageGC) Execute(execCtx ExecuteContext) { log := logger.WithType[AgentStorageGC]("Event") + startTime := time.Now() log.Debugf("begin with %v", logger.FormatStruct(t.AgentStorageGC)) - defer log.Debugf("end") + defer func() { + log.Debugf("end, time: %v", time.Since(startTime)) + }() // TODO unavailable的节点需不需要发送任务? diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 9d82041..0a6eee0 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -51,8 +51,11 @@ func (t *CheckPackageRedundancy) TryMerge(other Event) bool { func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { log := logger.WithType[CheckPackageRedundancy]("Event") + startTime := time.Now() log.Debugf("begin with %v", logger.FormatStruct(t.CheckPackageRedundancy)) - defer log.Debugf("end") + defer func() { + log.Debugf("end, time: %v", time.Since(startTime)) + }() coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 4957770..67e4599 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -5,6 +5,7 @@ import ( "math" "math/rand" "sync" + "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" @@ -44,8 +45,11 @@ func (t *CleanPinned) TryMerge(other Event) bool { func (t *CleanPinned) Execute(execCtx ExecuteContext) { log := logger.WithType[CleanPinned]("Event") + startTime := time.Now() log.Debugf("begin with %v", logger.FormatStruct(t.CleanPinned)) - defer log.Debugf("end") + defer func() { + log.Debugf("end, time: %v", time.Since(startTime)) + }() coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil {