Browse Source

修改文件批量上传功能的部分代码

gitlink
songjc 2 years ago
parent
commit
f07a864ab8
3 changed files with 114 additions and 70 deletions
  1. +69
    -16
      internal/cmdline/object.go
  2. +9
    -3
      internal/services/object.go
  3. +36
    -51
      internal/task/upload_rep_object.go

+ 69
- 16
internal/cmdline/object.go View File

@@ -73,6 +73,44 @@ func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int
return nil return nil
} }


func ObjectDownloadObjectDir(ctx CommandContext, localFilePath string, dirName string) error {
/* // 创建本地文件夹
curExecPath, err := os.Executable()
if err != nil {
return fmt.Errorf("get executable directory failed, err: %w", err)
}

outputFilePath := filepath.Join(filepath.Dir(curExecPath), localFilePath)
outputFileDir := filepath.Dir(outputFilePath)

err = os.MkdirAll(outputFileDir, os.ModePerm)
if err != nil {
return fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err)
}

outputFile, err := os.Create(outputFilePath)
if err != nil {
return fmt.Errorf("create output file %s failed, err: %w", outputFilePath, err)
}
defer outputFile.Close()

// 下载文件
reader, err := ctx.Cmdline.Svc.ObjectSvc().DownloadObject(0, objectID)
if err != nil {
return fmt.Errorf("download object failed, err: %w", err)
}
defer reader.Close()

bkt := ratelimit.NewBucketWithRate(10*1024, 10*1024)
_, err = io.Copy(outputFile, ratelimit.Reader(reader, bkt))
if err != nil {
// TODO 写入到文件失败,是否要考虑删除这个不完整的文件?
return fmt.Errorf("copy object data to local file failed, err: %w", err)
} */

return nil
}

func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID int, objectName string, repCount int) error { func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID int, objectName string, repCount int) error {
file, err := os.Open(localFilePath) file, err := os.Open(localFilePath)
if err != nil { if err != nil {
@@ -104,13 +142,13 @@ func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID in
} }


for { for {
complete, UploadRepResults, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5)
complete, UploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5)
if complete { if complete {
if err != nil { if err != nil {
return fmt.Errorf("uploading rep object: %w", err) return fmt.Errorf("uploading rep object: %w", err)
} }


fmt.Print(UploadRepResults[0].ResultFileHash)
fmt.Print(UploadObjectResult.UploadRepResults[0].ResultFileHash)
return nil return nil
} }


@@ -149,32 +187,45 @@ func ObjectUploadRepObjectDir(ctx CommandContext, localDirPath string, bucketID
} }


// 遍历 关闭文件流 // 遍历 关闭文件流
for _, uploadFile := range uploadFiles {
defer uploadFile.File.Close()
}
defer func() {
for _, uploadFile := range uploadFiles {
uploadFile.File.Close()
}
}()


taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadFiles, repCount) taskID, err := ctx.Cmdline.Svc.ObjectSvc().StartUploadingRepObjects(0, bucketID, uploadFiles, repCount)

if err != nil { if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err) return fmt.Errorf("upload file data failed, err: %w", err)
} }


for { for {
complete, UploadRepResults, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5)
complete, UploadObjectResult, err := ctx.Cmdline.Svc.ObjectSvc().WaitUploadingRepObjects(taskID, time.Second*5)
if complete { if complete {

tb := table.NewWriter()
tb.AppendHeader(table.Row{"ObjectID", "FileHash"})

for _, uploadRepResult := range UploadRepResults {
tb.AppendRow(table.Row{uploadRepResult.ObjectID, uploadRepResult.ResultFileHash})
}

fmt.Print(tb.Render())

if err != nil { if err != nil {
return fmt.Errorf("uploading rep object: %w", err) return fmt.Errorf("uploading rep object: %w", err)
} }


tb := table.NewWriter()
if UploadObjectResult.IsUploading {

tb.AppendHeader(table.Row{"ObjectID", "ObjectName", "FileHash"})
for i := 0; i < len(UploadObjectResult.UploadObjects); i++ {
tb.AppendRow(table.Row{UploadObjectResult.UploadRepResults[i].ObjectID, UploadObjectResult.UploadObjects[i].ObjectName, UploadObjectResult.UploadRepResults[i].ResultFileHash})
}
fmt.Print(tb.Render())

} else {
fmt.Println("The folder upload failed. Some files do not meet the upload requirements.")

tb.AppendHeader(table.Row{"ObjectName", "Error"})
for i := 0; i < len(UploadObjectResult.UploadObjects); i++ {
if UploadObjectResult.UploadRepResults[i].Error != nil {
tb.AppendRow(table.Row{UploadObjectResult.UploadObjects[i].ObjectName, UploadObjectResult.UploadRepResults[i].Error})
}
}
fmt.Print(tb.Render())
}
return nil return nil
} }


@@ -250,6 +301,8 @@ func init() {


commands.MustAdd(ObjectDownloadObject, "object", "get") commands.MustAdd(ObjectDownloadObject, "object", "get")


commands.MustAdd(ObjectDownloadObjectDir, "object", "get", "dir")

commands.MustAdd(ObjectUpdateRepObject, "object", "update", "rep") commands.MustAdd(ObjectUpdateRepObject, "object", "update", "rep")


commands.MustAdd(ObjectDeleteObject, "object", "delete") commands.MustAdd(ObjectDeleteObject, "object", "delete")


+ 9
- 3
internal/services/object.go View File

@@ -181,12 +181,18 @@ func (svc *ObjectService) StartUploadingRepObjects(userID int, bucketID int, upl
return tsk.ID(), nil return tsk.ID(), nil
} }


func (svc *ObjectService) WaitUploadingRepObjects(taskID string, waitTimeout time.Duration) (bool, []task.UploadRepResult, error) {
func (svc *ObjectService) WaitUploadingRepObjects(taskID string, waitTimeout time.Duration) (bool, task.UploadObjectResult, error) {
tsk := svc.taskMgr.FindByID(taskID) tsk := svc.taskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) { if tsk.WaitTimeout(waitTimeout) {
return true, tsk.Body().(*task.UploadRepObject).UploadRepResults, tsk.Error()
uploadObjectResult := task.UploadObjectResult{
UploadObjects: tsk.Body().(*task.UploadRepObject).UploadObjects,
UploadRepResults: tsk.Body().(*task.UploadRepObject).UploadRepResults,
IsUploading: tsk.Body().(*task.UploadRepObject).IsUploading,
}

return true, uploadObjectResult, tsk.Error()
} }
return false, nil, nil
return false, task.UploadObjectResult{}, nil
} }


func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error { func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error {


+ 36
- 51
internal/task/upload_rep_object.go View File

@@ -23,13 +23,20 @@ import (
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )


// UploadObjects和UploadRepResults一一对应
// UploadObjects和UploadRepResults一一对应关系
type UploadRepObject struct { type UploadRepObject struct {
userID int userID int
bucketID int bucketID int
repCount int repCount int
UploadObjects []UploadObject UploadObjects []UploadObject
UploadRepResults []UploadRepResult UploadRepResults []UploadRepResult
IsUploading bool
}

type UploadObjectResult struct {
UploadObjects []UploadObject
UploadRepResults []UploadRepResult
IsUploading bool
} }


type UploadObject struct { type UploadObject struct {
@@ -54,15 +61,13 @@ func NewUploadRepObject(userID int, bucketID int, UploadObjects []UploadObject,
} }


func (t *UploadRepObject) Execute(ctx TaskContext, complete CompleteFn) { func (t *UploadRepObject) Execute(ctx TaskContext, complete CompleteFn) {

UploadRepResults, err := t.do(ctx)
t.UploadRepResults = UploadRepResults
err := t.do(ctx)
complete(err, CompleteOption{ complete(err, CompleteOption{
RemovingDelay: time.Minute, RemovingDelay: time.Minute,
}) })
} }


func (t *UploadRepObject) do(ctx TaskContext) ([]UploadRepResult, error) {
func (t *UploadRepObject) do(ctx TaskContext) error {


reqBlder := reqbuilder.NewBuilder() reqBlder := reqbuilder.NewBuilder()
for _, uploadObject := range t.UploadObjects { for _, uploadObject := range t.UploadObjects {
@@ -82,72 +87,52 @@ func (t *UploadRepObject) do(ctx TaskContext) ([]UploadRepResult, error) {
Cache().CreateAny(). Cache().CreateAny().
MutexLock(ctx.DistLock) MutexLock(ctx.DistLock)
if err != nil { if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
return fmt.Errorf("acquire locks failed, err: %w", err)
} }
defer mutex.Unlock() defer mutex.Unlock()


uploadObjs := []struct {
UploadObject UploadObject
nodes []ramsg.RespNode
}{}
var repWriteResps []*coormsg.PreUploadResp


for _, uploadObject := range t.UploadObjects {
nodes, err := t.preUploadSingleObject(ctx, uploadObject)
//判断是否所有文件都符合上传条件
flag := true
for i := 0; i < len(t.UploadObjects); i++ {
repWriteResp, err := t.preUploadSingleObject(ctx, t.UploadObjects[i])
if err != nil { if err != nil {
// 不满足上传条件,直接记录结果
result := UploadRepResult{
Error: err,
ResultFileHash: "",
ObjectID: 0,
}
t.UploadRepResults = append(t.UploadRepResults, result)
flag = false
t.UploadRepResults = append(t.UploadRepResults,
UploadRepResult{
Error: err,
ResultFileHash: "",
ObjectID: 0,
})
continue continue
} }

obj := struct {
UploadObject UploadObject
nodes []ramsg.RespNode
}{
UploadObject: uploadObject,
nodes: nodes,
}
uploadObjs = append(uploadObjs, obj)
t.UploadRepResults = append(t.UploadRepResults, UploadRepResult{})
repWriteResps = append(repWriteResps, repWriteResp)
} }


// 不满足上传条件,返回结果
if len(uploadObjs) != len(t.UploadObjects) {
return t.UploadRepResults, fmt.Errorf("Folder does not meet the upload requirements.")
// 不满足上传条件,返回各文件检查结果
if !flag {
return nil
} }


//上传文件夹 //上传文件夹
for _, uploadObj := range uploadObjs {
objectID, fileHash, err := t.uploadSingleObject(ctx, uploadObj.UploadObject, uploadObj.nodes)
if err != nil {
// 上传文件时出现错误,记录结果
result := UploadRepResult{
Error: err,
ResultFileHash: "",
ObjectID: 0,
}
t.UploadRepResults = append(t.UploadRepResults, result)
continue
}

// 文件上传成功,记录结果
result := UploadRepResult{
t.IsUploading = true
for i := 0; i < len(repWriteResps); i++ {
objectID, fileHash, err := t.uploadSingleObject(ctx, t.UploadObjects[i], repWriteResps[i].Nodes)
// 记录文件上传结果
t.UploadRepResults[i] = UploadRepResult{
Error: err, Error: err,
ResultFileHash: fileHash, ResultFileHash: fileHash,
ObjectID: objectID, ObjectID: objectID,
} }
t.UploadRepResults = append(t.UploadRepResults, result)
} }
return t.UploadRepResults, nil
return nil
} }


// 检查单个文件是否能够上传 // 检查单个文件是否能够上传
func (t *UploadRepObject) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) ([]ramsg.RespNode, error) {
func (t *UploadRepObject) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) {
//发送写请求,请求Coor分配写入节点Ip //发送写请求,请求Coor分配写入节点Ip
// fmt.Printf("uploadObject: %v\n", uploadObject)
repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP)) repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP))
if err != nil { if err != nil {
return nil, fmt.Errorf("pre upload rep object: %w", err) return nil, fmt.Errorf("pre upload rep object: %w", err)
@@ -155,7 +140,7 @@ func (t *UploadRepObject) preUploadSingleObject(ctx TaskContext, uploadObject Up
if len(repWriteResp.Nodes) == 0 { if len(repWriteResp.Nodes) == 0 {
return nil, fmt.Errorf("no node to upload file") return nil, fmt.Errorf("no node to upload file")
} }
return repWriteResp.Nodes, nil
return repWriteResp, nil
} }


// 上传文件 // 上传文件


Loading…
Cancel
Save