From 7b582c2ee32a572638008c62ae56035fbf6f9ac0 Mon Sep 17 00:00:00 2001 From: songjc <969378911@qq.com> Date: Tue, 25 Jul 2023 15:23:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=8A=E4=BC=A0=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=A4=B9=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/cmdline/object.go | 84 ++++++++++++++++- internal/services/object.go | 11 +-- internal/task/upload_rep_object.go | 143 ++++++++++++++++++++++------- 3 files changed, 196 insertions(+), 42 deletions(-) diff --git a/internal/cmdline/object.go b/internal/cmdline/object.go index b69c377..9a04c92 100644 --- a/internal/cmdline/object.go +++ b/internal/cmdline/object.go @@ -5,10 +5,12 @@ import ( "io" "os" "path/filepath" + "strings" "time" "github.com/jedib0t/go-pretty/v6/table" "github.com/juju/ratelimit" + "gitlink.org.cn/cloudream/client/internal/task" myio "gitlink.org.cn/cloudream/common/utils/io" ) @@ -86,24 +88,29 @@ func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID in // TODO 测试用 bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024) - taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObject(0, bucketID, objectName, - myio.WithCloser(ratelimit.Reader(file, bkt), + uploadObject := task.UploadObject{ + ObjectName: objectName, + File: myio.WithCloser(ratelimit.Reader(file, bkt), func(reader io.Reader) error { return file.Close() }), - fileSize, repCount) + FileSize: fileSize, + } + uploadObjects := []task.UploadObject{uploadObject} + + taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadObjects, repCount) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) } for { - complete, fileHash, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObject(taskID, time.Second*5) + complete, UploadRepResults, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) if complete { if err != nil { return fmt.Errorf("uploading rep object: %w", err) } - fmt.Print(fileHash) + fmt.Print(UploadRepResults[0].ResultFileHash) return nil } @@ -113,6 +120,71 @@ func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID in } } +func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID int, repCount int) error { + + var uploadFiles []task.UploadObject + var uploadFile task.UploadObject + err := filepath.Walk(localDirPath, func(fname string, fi os.FileInfo, err error) error { + if !fi.IsDir() { + file, err := os.Open(fname) + if err != nil { + return fmt.Errorf("open file %s failed, err: %w", fname, err) + } + // TODO 测试用 + bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024) + uploadFile = task.UploadObject{ + ObjectName: strings.Replace(fname, "\\", "/", -1), + File: myio.WithCloser(ratelimit.Reader(file, bkt), + func(reader io.Reader) error { + return file.Close() + }), + FileSize: fi.Size(), + } + uploadFiles = append(uploadFiles, uploadFile) + } + return nil + }) + if err != nil { + return fmt.Errorf("open directory %s failed, err: %w", localDirPath, err) + } + + // 遍历 关闭文件流 + for _, uploadFile := range uploadFiles { + defer uploadFile.File.Close() + } + + taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadFiles, repCount) + if err != nil { + return fmt.Errorf("upload file data failed, err: %w", err) + } + + for { + complete, UploadRepResults, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) + if complete { + + tb := table.NewWriter() + tb.AppendHeader(table.Row{"ObjectID", "FileHash"}) + + for _, uploadRepResult := range UploadRepResults { + tb.AppendRow(table.Row{uploadRepResult.ObjectID, uploadRepResult.ResultFileHash}) + } + + fmt.Print(tb.Render()) + + if err != nil { + return fmt.Errorf("uploading rep object: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("wait uploading: %w", err) + } + } + +} + func ObjectEcWrite(ctx CommandContext, localFilePath string, bucketID int, objectName string, ecName string) error { // TODO panic("not implement yet") @@ -174,6 +246,8 @@ func init() { commands.MustAdd(ObjectUploadRepObject, "object", "new", "rep") + commands.MustAdd(ObjectUploadRepObjectDir, "object", "new", "dir") + commands.MustAdd(ObjectDownloadObject, "object", "get") commands.MustAdd(ObjectUpdateRepObject, "object", "update", "rep") diff --git a/internal/services/object.go b/internal/services/object.go index 9ce4e14..d61b80d 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -176,18 +176,17 @@ func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, return reader, nil } -func (svc *ObjectService) StartUploadingRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) (string, error) { - tsk := svc.taskMgr.StartNew(task.NewUploadRepObject(userID, bucketID, objectName, file, fileSize, repCount)) +func (svc *ObjectService) StartUploadingRepObjects(userID int, bucketID int, uploadObjects []task.UploadObject, repCount int) (string, error) { + tsk := svc.taskMgr.StartNew(task.NewUploadRepObject(userID, bucketID, uploadObjects, repCount)) return tsk.ID(), nil } -func (svc *ObjectService) WaitUploadingRepObject(taskID string, waitTimeout time.Duration) (bool, string, error) { +func (svc *ObjectService) WaitUploadingRepObjects(taskID string, waitTimeout time.Duration) (bool, []task.UploadRepResult, error) { tsk := svc.taskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - return true, tsk.Body().(*task.UploadRepObject).ResultFileHash, tsk.Error() + return true, tsk.Body().(*task.UploadRepObject).UploadRepResults, tsk.Error() } - - return false, "", nil + return false, nil, nil } func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error { diff --git a/internal/task/upload_rep_object.go b/internal/task/upload_rep_object.go index 486ca31..c2a9c9e 100644 --- a/internal/task/upload_rep_object.go +++ b/internal/task/upload_rep_object.go @@ -23,43 +23,57 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +// UploadObjects和UploadRepResults一一对应 type UploadRepObject struct { - userID int - bucketID int - objectName string - file io.ReadCloser - fileSize int64 - repCount int + userID int + bucketID int + repCount int + UploadObjects []UploadObject + UploadRepResults []UploadRepResult +} + +type UploadObject struct { + ObjectName string + File io.ReadCloser + FileSize int64 +} +type UploadRepResult struct { + Error error ResultFileHash string + ObjectID int64 } -func NewUploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) *UploadRepObject { +func NewUploadRepObject(userID int, bucketID int, UploadObjects []UploadObject, repCount int) *UploadRepObject { return &UploadRepObject{ - userID: userID, - bucketID: bucketID, - objectName: objectName, - file: file, - fileSize: fileSize, - repCount: repCount, + userID: userID, + bucketID: bucketID, + UploadObjects: UploadObjects, + repCount: repCount, } } func (t *UploadRepObject) Execute(ctx TaskContext, complete CompleteFn) { - fileHash, err := t.do(ctx) - t.ResultFileHash = fileHash + + UploadRepResults, err := t.do(ctx) + t.UploadRepResults = UploadRepResults complete(err, CompleteOption{ RemovingDelay: time.Minute, }) } -func (t *UploadRepObject) do(ctx TaskContext) (string, error) { - mutex, err := reqbuilder.NewBuilder(). +func (t *UploadRepObject) do(ctx TaskContext) ([]UploadRepResult, error) { + + reqBlder := reqbuilder.NewBuilder() + for _, uploadObject := range t.UploadObjects { + reqBlder.Metadata(). + // 用于防止创建了多个同名对象 + Object().CreateOne(t.bucketID, uploadObject.ObjectName) + } + mutex, err := reqBlder. Metadata(). // 用于判断用户是否有桶的权限 UserBucket().ReadOne(t.userID, t.bucketID). - // 用于防止创建了多个同名对象 - Object().CreateOne(t.bucketID, t.objectName). // 用于查询可用的上传节点 Node().ReadAny(). // 用于设置Rep配置 @@ -68,20 +82,86 @@ func (t *UploadRepObject) do(ctx TaskContext) (string, error) { Cache().CreateAny(). MutexLock(ctx.DistLock) if err != nil { - return "", fmt.Errorf("acquire locks failed, err: %w", err) + return nil, fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex.Unlock() + uploadObjs := []struct { + UploadObject UploadObject + nodes []ramsg.RespNode + }{} + + for _, uploadObject := range t.UploadObjects { + nodes, err := t.preUploadSingleObject(ctx, uploadObject) + if err != nil { + // 不满足上传条件,直接记录结果 + result := UploadRepResult{ + Error: err, + ResultFileHash: "", + ObjectID: 0, + } + t.UploadRepResults = append(t.UploadRepResults, result) + continue + } + + obj := struct { + UploadObject UploadObject + nodes []ramsg.RespNode + }{ + UploadObject: uploadObject, + nodes: nodes, + } + uploadObjs = append(uploadObjs, obj) + } + + // 不满足上传条件,返回结果 + if len(uploadObjs) != len(t.UploadObjects) { + return t.UploadRepResults, fmt.Errorf("Folder does not meet the upload requirements.") + } + + //上传文件夹 + for _, uploadObj := range uploadObjs { + objectID, fileHash, err := t.uploadSingleObject(ctx, uploadObj.UploadObject, uploadObj.nodes) + if err != nil { + // 上传文件时出现错误,记录结果 + result := UploadRepResult{ + Error: err, + ResultFileHash: "", + ObjectID: 0, + } + t.UploadRepResults = append(t.UploadRepResults, result) + continue + } + + // 文件上传成功,记录结果 + result := UploadRepResult{ + Error: err, + ResultFileHash: fileHash, + ObjectID: objectID, + } + t.UploadRepResults = append(t.UploadRepResults, result) + } + return t.UploadRepResults, nil +} + +// 检查单个文件是否能够上传 +func (t *UploadRepObject) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) ([]ramsg.RespNode, error) { //发送写请求,请求Coor分配写入节点Ip - repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, t.objectName, t.fileSize, t.userID, config.Cfg().ExternalIP)) + // fmt.Printf("uploadObject: %v\n", uploadObject) + repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP)) if err != nil { - return "", fmt.Errorf("pre upload rep object: %w", err) + return nil, fmt.Errorf("pre upload rep object: %w", err) } if len(repWriteResp.Nodes) == 0 { - return "", fmt.Errorf("no node to upload file") + return nil, fmt.Errorf("no node to upload file") } + return repWriteResp.Nodes, nil +} + +// 上传文件 +func (t *UploadRepObject) uploadSingleObject(ctx TaskContext, uploadObject UploadObject, nodes []ramsg.RespNode) (int64, string, error) { - uploadNode := t.chooseUploadNode(repWriteResp.Nodes) + uploadNode := t.chooseUploadNode(nodes) var fileHash string uploadedNodeIDs := []int{} @@ -90,7 +170,8 @@ func (t *UploadRepObject) do(ctx TaskContext) (string, error) { if ctx.IPFS != nil { logger.Infof("try to use local IPFS to upload file") - fileHash, err = uploadToLocalIPFS(ctx.IPFS, t.file, uploadNode.ID) + var err error + fileHash, err = uploadToLocalIPFS(ctx.IPFS, uploadObject.File, uploadNode.ID) if err != nil { logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error()) } else { @@ -113,24 +194,24 @@ func (t *UploadRepObject) do(ctx TaskContext) (string, error) { IPFS().CreateAnyRep(uploadNode.ID). MutexLock(ctx.DistLock) if err != nil { - return "", fmt.Errorf("acquire locks failed, err: %w", err) + return 0, "", fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex.Unlock() - fileHash, err = uploadToNode(t.file, nodeIP) + fileHash, err = uploadToNode(uploadObject.File, nodeIP) if err != nil { - return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) + return 0, "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) } uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID) } // 记录写入的文件的Hash - _, err = ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, t.objectName, t.fileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash)) + createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash)) if err != nil { - return "", fmt.Errorf("creating rep object: %w", err) + return 0, "", fmt.Errorf("creating rep object: %w", err) } - return fileHash, nil + return createResp.ObjectID, fileHash, nil } // chooseUploadNode 选择一个上传文件的节点