diff --git a/internal/cmdline/object.go b/internal/cmdline/object.go index 0f8c900..53c6de5 100644 --- a/internal/cmdline/object.go +++ b/internal/cmdline/object.go @@ -5,10 +5,10 @@ import ( "io" "os" "path/filepath" - "strings" "time" "github.com/jedib0t/go-pretty/v6/table" + "github.com/juju/ratelimit" "gitlink.org.cn/cloudream/client/internal/task" ) @@ -35,22 +35,16 @@ func ObjectListBucketObjects(ctx CommandContext, bucketID int64) error { func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int64) error { // 创建本地文件 - curExecPath, err := os.Executable() - if err != nil { - return fmt.Errorf("get executable directory failed, err: %w", err) - } + outputFileDir := filepath.Dir(localFilePath) - outputFilePath := filepath.Join(filepath.Dir(curExecPath), localFilePath) - outputFileDir := filepath.Dir(outputFilePath) - - err = os.MkdirAll(outputFileDir, os.ModePerm) + err := os.MkdirAll(outputFileDir, os.ModePerm) if err != nil { return fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err) } - outputFile, err := os.Create(outputFilePath) + outputFile, err := os.Create(localFilePath) if err != nil { - return fmt.Errorf("create output file %s failed, err: %w", outputFilePath, err) + return fmt.Errorf("create output file %s failed, err: %w", localFilePath, err) } defer outputFile.Close() @@ -69,40 +63,54 @@ func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int return nil } -func ObjectDownloadObjectDir(ctx CommandContext, localFilePath string, dirName string) error { - /* // 创建本地文件夹 - curExecPath, err := os.Executable() +func ObjectDownloadObjectDir(ctx CommandContext, outputBaseDir string, dirName string) error { + // 创建本地文件夹 + err := os.MkdirAll(outputBaseDir, os.ModePerm) if err != nil { - return fmt.Errorf("get executable directory failed, err: %w", err) + return fmt.Errorf("create output base directory %s failed, err: %w", outputBaseDir, err) } - outputFilePath := filepath.Join(filepath.Dir(curExecPath), localFilePath) - outputFileDir := filepath.Dir(outputFilePath) - - err = os.MkdirAll(outputFileDir, os.ModePerm) + // 下载文件夹 + resObjs, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObjectDir(0, dirName) if err != nil { - return fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err) + return fmt.Errorf("download folder failed, err: %w", err) } - outputFile, err := os.Create(outputFilePath) - if err != nil { - return fmt.Errorf("create output file %s failed, err: %w", outputFilePath, err) - } - defer outputFile.Close() + // 遍历 关闭文件流 + defer func() { + for _, resObj := range resObjs { + resObj.Reader.Close() + } + }() - // 下载文件 - reader, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObject(0, objectID) - if err != nil { - return fmt.Errorf("download object failed, err: %w", err) - } - defer reader.Close() + for i := 0; i < len(resObjs); i++ { + if resObjs[i].Error != nil { + fmt.Printf("download file %s failed, err: %s", resObjs[i].ObjectName, err.Error()) + continue + } + outputFilePath := filepath.Join(outputBaseDir, resObjs[i].ObjectName) + outputFileDir := filepath.Dir(outputFilePath) + err = os.MkdirAll(outputFileDir, os.ModePerm) + if err != nil { + fmt.Printf("create output file directory %s failed, err: %s", outputFileDir, err.Error()) + continue + } - _, err = io.Copy(outputFile, reader) - if err != nil { - // TODO 写入到文件失败,是否要考虑删除这个不完整的文件? - return fmt.Errorf("copy object data to local file failed, err: %w", err) - } */ + outputFile, err := os.Create(outputFilePath) + if err != nil { + fmt.Printf("create output file %s failed, err: %s", outputFilePath, err.Error()) + continue + } + defer outputFile.Close() + bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024) + _, err = io.Copy(outputFile, ratelimit.Reader(resObjs[i].Reader, bkt)) + if err != nil { + // TODO 写入到文件失败,是否要考虑删除这个不完整的文件? + fmt.Printf("copy object data to local file failed, err: %s", err.Error()) + continue + } + } return nil } @@ -163,7 +171,7 @@ func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID return fmt.Errorf("open file %s failed, err: %w", fname, err) } uploadFile = task.UploadObject{ - ObjectName: strings.Replace(fname, "\\", "/", -1), + ObjectName: filepath.ToSlash(fname), File: file, FileSize: fi.Size(), } @@ -198,9 +206,13 @@ func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID tb := table.NewWriter() if uploadObjectResult.IsUploading { - tb.AppendHeader(table.Row{"ObjectID", "ObjectName", "FileHash"}) + tb.AppendHeader(table.Row{"ObjectName", "ObjectID", "FileHash"}) for i := 0; i < len(uploadObjectResult.Objects); i++ { - tb.AppendRow(table.Row{uploadObjectResult.Results[i].ObjectID, uploadObjectResult.Objects[i].ObjectName, uploadObjectResult.Results[i].FileHash}) + tb.AppendRow(table.Row{ + uploadObjectResult.Objects[i].ObjectName, + uploadObjectResult.Results[i].ObjectID, + uploadObjectResult.Results[i].FileHash, + }) } fmt.Print(tb.Render()) @@ -222,7 +234,6 @@ func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID return fmt.Errorf("wait uploading: %w", err) } } - } func ObjectEcWrite(ctx CommandContext, localFilePath string, bucketID int64, objectName string, ecName string) error { @@ -282,7 +293,7 @@ func init() { commands.MustAdd(ObjectUploadRepObjectDir, "object", "new", "dir") - commands.MustAdd(ObjectDownloadObject, "object", "get") + commands.MustAdd(ObjectDownloadObject, "object", "get", "rep") commands.MustAdd(ObjectDownloadObjectDir, "object", "get", "dir") diff --git a/internal/services/object.go b/internal/services/object.go index 4de7f92..520f62a 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -26,6 +26,12 @@ type ObjectService struct { *Service } +type ResultDownloadObject struct { + ObjectName string + Reader io.ReadCloser + Error error +} + func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } @@ -35,6 +41,51 @@ func (svc *ObjectService) GetObject(userID int64, objectID int64) (model.Object, panic("not implement yet") } +func (svc *ObjectService) DownloadObjectDir(userID int64, dirName string) ([]ResultDownloadObject, error) { + + mutex, err := reqbuilder.NewBuilder(). + // 用于判断用户是否有对象权限 + Metadata().UserBucket().ReadAny(). + // 用于查询可用的下载节点 + Node().ReadAny(). + // 用于读取文件信息 + Object().ReadAny(). + // 用于查询Rep配置 + ObjectRep().ReadAny(). + // 用于查询Block配置 + ObjectBlock().ReadAny(). + // 用于查询包含了副本的节点 + Cache().ReadAny(). + MutexLock(svc.distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + + // TODO 解锁时机需要优化,在所有文件都写入到本地后再解锁 + // 当前是所有文件流全部打开,处理完最后全部关闭,可以考虑加一个迭代器,将文件流逐个打开关闭 + defer mutex.Unlock() + + //根据dirName查询相关的所有文件 + objsResp, err := svc.coordinator.GetObjectsByDirName(coormsg.NewGetObjectsByDirName(userID, dirName)) + if err != nil { + return nil, fmt.Errorf("get objectID by dirName failed: %w", err) + } + if len(objsResp.Objects) == 0 { + return nil, fmt.Errorf("dirName %v is not exist", dirName) + } + + resultDownloadObjects := []ResultDownloadObject{} + for i := 0; i < len(objsResp.Objects); i++ { + reader, err := svc.downloadSingleObject(objsResp.Objects[i].ObjectID, userID) + resultDownloadObjects = append(resultDownloadObjects, ResultDownloadObject{ + ObjectName: objsResp.Objects[i].Name, + Reader: reader, + Error: err, + }) + } + return resultDownloadObjects, nil +} + func (svc *ObjectService) DownloadObject(userID int64, objectID int64) (io.ReadCloser, error) { mutex, err := reqbuilder.NewBuilder(). // 用于判断用户是否有对象权限 @@ -53,17 +104,26 @@ func (svc *ObjectService) DownloadObject(userID int64, objectID int64) (io.ReadC if err != nil { return nil, fmt.Errorf("acquire locks failed, err: %w", err) } + defer mutex.Unlock() + + reader, err := svc.downloadSingleObject(objectID, userID) + + // defer myio.AfterReadClosed(reader, func(closer io.ReadCloser) { + // // TODO 可以考虑在打开了读取流之后就解锁,而不是要等外部读取完毕 + // mutex.Unlock() + // }) + return reader, err +} +func (svc *ObjectService) downloadSingleObject(objectID int64, userID int64) (io.ReadCloser, error) { preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObject(objectID, userID, config.Cfg().ExternalIP)) if err != nil { - mutex.Unlock() return nil, fmt.Errorf("pre download object: %w", err) } switch redundancy := preDownloadResp.Redundancy.(type) { case ramsg.RespRepRedundancyData: if len(redundancy.Nodes) == 0 { - mutex.Unlock() return nil, fmt.Errorf("no node has this file") } @@ -80,21 +140,14 @@ func (svc *ObjectService) DownloadObject(userID int64, objectID int64) (io.ReadC reader, err := svc.downloadRepObject(entry.ID, nodeIP, redundancy.FileHash) if err != nil { - mutex.Unlock() return nil, fmt.Errorf("rep read failed, err: %w", err) } - - return myio.AfterReadClosed(reader, func(closer io.ReadCloser) { - // TODO 可以考虑在打开了读取流之后就解锁,而不是要等外部读取完毕 - mutex.Unlock() - }), nil + return reader, nil //case consts.REDUNDANCY_EC: // TODO EC部分的代码要考虑重构 // ecRead(readResp.FileSize, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName) } - - mutex.Unlock() return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Redundancy) } diff --git a/internal/task/upload_rep_objects.go b/internal/task/upload_rep_objects.go index 40e6df5..2664ecb 100644 --- a/internal/task/upload_rep_objects.go +++ b/internal/task/upload_rep_objects.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/client/internal/config" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" + "gitlink.org.cn/cloudream/common/utils" mygrpc "gitlink.org.cn/cloudream/common/utils/grpc" "gitlink.org.cn/cloudream/common/utils/ipfs" @@ -189,8 +190,10 @@ func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject Uplo uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID) } + dirName := utils.GetDirectoryName(uploadObject.ObjectName) + // 记录写入的文件的Hash - createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash)) + createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash, dirName)) if err != nil { return 0, "", fmt.Errorf("creating rep object: %w", err) }