diff --git a/internal/cmdline/object.go b/internal/cmdline/object.go index 9a04c92..f0dff70 100644 --- a/internal/cmdline/object.go +++ b/internal/cmdline/object.go @@ -73,6 +73,44 @@ func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int return nil } +func ObjectDownloadObjectDir(ctx CommandContext, localFilePath string, dirName string) error { + /* // 创建本地文件夹 + curExecPath, err := os.Executable() + if err != nil { + return fmt.Errorf("get executable directory failed, err: %w", err) + } + + outputFilePath := filepath.Join(filepath.Dir(curExecPath), localFilePath) + outputFileDir := filepath.Dir(outputFilePath) + + err = os.MkdirAll(outputFileDir, os.ModePerm) + if err != nil { + return fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err) + } + + outputFile, err := os.Create(outputFilePath) + if err != nil { + return fmt.Errorf("create output file %s failed, err: %w", outputFilePath, err) + } + defer outputFile.Close() + + // 下载文件 + reader, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObject(0, objectID) + if err != nil { + return fmt.Errorf("download object failed, err: %w", err) + } + defer reader.Close() + + bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024) + _, err = io.Copy(outputFile, ratelimit.Reader(reader, bkt)) + if err != nil { + // TODO 写入到文件失败,是否要考虑删除这个不完整的文件? + return fmt.Errorf("copy object data to local file failed, err: %w", err) + } */ + + return nil +} + func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID int, objectName string, repCount int) error { file, err := os.Open(localFilePath) if err != nil { @@ -104,13 +142,13 @@ func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID in } for { - complete, UploadRepResults, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) + complete, UploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) if complete { if err != nil { return fmt.Errorf("uploading rep object: %w", err) } - fmt.Print(UploadRepResults[0].ResultFileHash) + fmt.Print(UploadObjectResult.UploadRepResults[0].ResultFileHash) return nil } @@ -149,32 +187,45 @@ func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID } // 遍历 关闭文件流 - for _, uploadFile := range uploadFiles { - defer uploadFile.File.Close() - } + defer func() { + for _, uploadFile := range uploadFiles { + uploadFile.File.Close() + } + }() taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadFiles, repCount) + if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) } for { - complete, UploadRepResults, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) + complete, UploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5) if complete { - - tb := table.NewWriter() - tb.AppendHeader(table.Row{"ObjectID", "FileHash"}) - - for _, uploadRepResult := range UploadRepResults { - tb.AppendRow(table.Row{uploadRepResult.ObjectID, uploadRepResult.ResultFileHash}) - } - - fmt.Print(tb.Render()) - if err != nil { return fmt.Errorf("uploading rep object: %w", err) } + tb := table.NewWriter() + if UploadObjectResult.IsUploading { + + tb.AppendHeader(table.Row{"ObjectID", "ObjectName", "FileHash"}) + for i := 0; i < len(UploadObjectResult.UploadObjects); i++ { + tb.AppendRow(table.Row{UploadObjectResult.UploadRepResults[i].ObjectID, UploadObjectResult.UploadObjects[i].ObjectName, UploadObjectResult.UploadRepResults[i].ResultFileHash}) + } + fmt.Print(tb.Render()) + + } else { + fmt.Println("The folder upload failed. Some files do not meet the upload requirements.") + + tb.AppendHeader(table.Row{"ObjectName", "Error"}) + for i := 0; i < len(UploadObjectResult.UploadObjects); i++ { + if UploadObjectResult.UploadRepResults[i].Error != nil { + tb.AppendRow(table.Row{UploadObjectResult.UploadObjects[i].ObjectName, UploadObjectResult.UploadRepResults[i].Error}) + } + } + fmt.Print(tb.Render()) + } return nil } @@ -250,6 +301,8 @@ func init() { commands.MustAdd(ObjectDownloadObject, "object", "get") + commands.MustAdd(ObjectDownloadObjectDir, "object", "get", "dir") + commands.MustAdd(ObjectUpdateRepObject, "object", "update", "rep") commands.MustAdd(ObjectDeleteObject, "object", "delete") diff --git a/internal/services/object.go b/internal/services/object.go index d61b80d..985383c 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -181,12 +181,18 @@ func (svc *ObjectService) StartUploadingRepObjects(userID int, bucketID int, upl return tsk.ID(), nil } -func (svc *ObjectService) WaitUploadingRepObjects(taskID string, waitTimeout time.Duration) (bool, []task.UploadRepResult, error) { +func (svc *ObjectService) WaitUploadingRepObjects(taskID string, waitTimeout time.Duration) (bool, task.UploadObjectResult, error) { tsk := svc.taskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - return true, tsk.Body().(*task.UploadRepObject).UploadRepResults, tsk.Error() + uploadObjectResult := task.UploadObjectResult{ + UploadObjects: tsk.Body().(*task.UploadRepObject).UploadObjects, + UploadRepResults: tsk.Body().(*task.UploadRepObject).UploadRepResults, + IsUploading: tsk.Body().(*task.UploadRepObject).IsUploading, + } + + return true, uploadObjectResult, tsk.Error() } - return false, nil, nil + return false, task.UploadObjectResult{}, nil } func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error { diff --git a/internal/task/upload_rep_object.go b/internal/task/upload_rep_object.go index c2a9c9e..8bd66c3 100644 --- a/internal/task/upload_rep_object.go +++ b/internal/task/upload_rep_object.go @@ -23,13 +23,20 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -// UploadObjects和UploadRepResults一一对应 +// UploadObjects和UploadRepResults为一一对应关系 type UploadRepObject struct { userID int bucketID int repCount int UploadObjects []UploadObject UploadRepResults []UploadRepResult + IsUploading bool +} + +type UploadObjectResult struct { + UploadObjects []UploadObject + UploadRepResults []UploadRepResult + IsUploading bool } type UploadObject struct { @@ -54,15 +61,13 @@ func NewUploadRepObject(userID int, bucketID int, UploadObjects []UploadObject, } func (t *UploadRepObject) Execute(ctx TaskContext, complete CompleteFn) { - - UploadRepResults, err := t.do(ctx) - t.UploadRepResults = UploadRepResults + err := t.do(ctx) complete(err, CompleteOption{ RemovingDelay: time.Minute, }) } -func (t *UploadRepObject) do(ctx TaskContext) ([]UploadRepResult, error) { +func (t *UploadRepObject) do(ctx TaskContext) error { reqBlder := reqbuilder.NewBuilder() for _, uploadObject := range t.UploadObjects { @@ -82,72 +87,52 @@ func (t *UploadRepObject) do(ctx TaskContext) ([]UploadRepResult, error) { Cache().CreateAny(). MutexLock(ctx.DistLock) if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) + return fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex.Unlock() - uploadObjs := []struct { - UploadObject UploadObject - nodes []ramsg.RespNode - }{} + var repWriteResps []*coormsg.PreUploadResp - for _, uploadObject := range t.UploadObjects { - nodes, err := t.preUploadSingleObject(ctx, uploadObject) + //判断是否所有文件都符合上传条件 + flag := true + for i := 0; i < len(t.UploadObjects); i++ { + repWriteResp, err := t.preUploadSingleObject(ctx, t.UploadObjects[i]) if err != nil { - // 不满足上传条件,直接记录结果 - result := UploadRepResult{ - Error: err, - ResultFileHash: "", - ObjectID: 0, - } - t.UploadRepResults = append(t.UploadRepResults, result) + flag = false + t.UploadRepResults = append(t.UploadRepResults, + UploadRepResult{ + Error: err, + ResultFileHash: "", + ObjectID: 0, + }) continue } - - obj := struct { - UploadObject UploadObject - nodes []ramsg.RespNode - }{ - UploadObject: uploadObject, - nodes: nodes, - } - uploadObjs = append(uploadObjs, obj) + t.UploadRepResults = append(t.UploadRepResults, UploadRepResult{}) + repWriteResps = append(repWriteResps, repWriteResp) } - // 不满足上传条件,返回结果 - if len(uploadObjs) != len(t.UploadObjects) { - return t.UploadRepResults, fmt.Errorf("Folder does not meet the upload requirements.") + // 不满足上传条件,返回各文件检查结果 + if !flag { + return nil } //上传文件夹 - for _, uploadObj := range uploadObjs { - objectID, fileHash, err := t.uploadSingleObject(ctx, uploadObj.UploadObject, uploadObj.nodes) - if err != nil { - // 上传文件时出现错误,记录结果 - result := UploadRepResult{ - Error: err, - ResultFileHash: "", - ObjectID: 0, - } - t.UploadRepResults = append(t.UploadRepResults, result) - continue - } - - // 文件上传成功,记录结果 - result := UploadRepResult{ + t.IsUploading = true + for i := 0; i < len(repWriteResps); i++ { + objectID, fileHash, err := t.uploadSingleObject(ctx, t.UploadObjects[i], repWriteResps[i].Nodes) + // 记录文件上传结果 + t.UploadRepResults[i] = UploadRepResult{ Error: err, ResultFileHash: fileHash, ObjectID: objectID, } - t.UploadRepResults = append(t.UploadRepResults, result) } - return t.UploadRepResults, nil + return nil } // 检查单个文件是否能够上传 -func (t *UploadRepObject) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) ([]ramsg.RespNode, error) { +func (t *UploadRepObject) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) { //发送写请求,请求Coor分配写入节点Ip - // fmt.Printf("uploadObject: %v\n", uploadObject) repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP)) if err != nil { return nil, fmt.Errorf("pre upload rep object: %w", err) @@ -155,7 +140,7 @@ func (t *UploadRepObject) preUploadSingleObject(ctx TaskContext, uploadObject Up if len(repWriteResp.Nodes) == 0 { return nil, fmt.Errorf("no node to upload file") } - return repWriteResp.Nodes, nil + return repWriteResp, nil } // 上传文件