Browse Source

Merge pull request 'EC的PacketSize保存到备份方式的配置中;优化公共结构体' (#6) from feature_gxh into master

gitlink
baohan 2 years ago
parent
commit
64cc7d5e0c
6 changed files with 34 additions and 47 deletions
  1. +10
    -15
      pkgs/cmd/create_ec_package.go
  2. +1
    -1
      pkgs/cmd/create_rep_package.go
  3. +7
    -12
      pkgs/cmd/download_package.go
  4. +5
    -6
      pkgs/cmd/update_ec_package.go
  5. +1
    -1
      pkgs/db/package.go
  6. +10
    -12
      pkgs/iterator/ec_object_iterator.go

+ 10
- 15
pkgs/cmd/create_ec_package.go View File

@@ -27,11 +27,6 @@ type CreateECPackage struct {
redundancy models.ECRedundancyInfo
}

type UpdateECPackageContext struct {
*UpdatePackageContext
ECPacketSize int64
}

type CreateECPackageResult struct {
PackageID int64
ObjectResults []ECObjectUploadResult
@@ -53,7 +48,7 @@ func NewCreateECPackage(userID int64, bucketID int64, name string, objIter itera
}
}

func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackageResult, error) {
func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageResult, error) {
defer t.objectIter.Close()

coorCli, err := globals.CoordinatorMQPool.Acquire()
@@ -82,7 +77,7 @@ func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackage
defer mutex.Unlock()

createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy)))
models.NewTypedRedundancyInfo(t.redundancy)))
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}
@@ -129,7 +124,7 @@ func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackage
}
defer ipfsMutex.Unlock()

rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config)
rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config)
if err != nil {
return nil, err
}
@@ -140,7 +135,7 @@ func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackage
}, nil
}

func uploadAndUpdateECPackage(ctx *UpdateECPackageContext, packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ec model.Ec) ([]ECObjectUploadResult, error) {
func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo models.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -158,7 +153,7 @@ func uploadAndUpdateECPackage(ctx *UpdateECPackageContext, packageID int64, obje
return nil, fmt.Errorf("reading object: %w", err)
}

fileHashes, uploadedNodeIDs, err := uploadECObject(ctx, objInfo, uploadNodes, ec)
fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec)
uploadRets = append(uploadRets, ECObjectUploadResult{
Info: objInfo,
Error: err,
@@ -179,7 +174,7 @@ func uploadAndUpdateECPackage(ctx *UpdateECPackageContext, packageID int64, obje
}

// 上传文件
func uploadECObject(ctx *UpdateECPackageContext, obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ec model.Ec) ([]string, []int64, error) {
func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo models.ECRedundancyInfo, ec model.Ec) ([]string, []int64, error) {
//生成纠删码的写入节点序列
nodes := make([]UploadNodeInfo, ec.EcN)
numNodes := len(uploadNodes)
@@ -188,7 +183,7 @@ func uploadECObject(ctx *UpdateECPackageContext, obj *iterator.IterUploadingObje
nodes[i] = uploadNodes[(startWriteNodeID+i)%numNodes]
}

hashs, err := ecWrite(ctx, obj.File, obj.Size, ec.EcK, ec.EcN, nodes)
hashs, err := ecWrite(obj.File, obj.Size, ecInfo.PacketSize, ec.EcK, ec.EcN, nodes)
if err != nil {
return nil, nil, fmt.Errorf("EcWrite failed, err: %w", err)
}
@@ -213,13 +208,13 @@ func (t *CreateECPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInf
return nodes[rand.Intn(len(nodes))]
}

func ecWrite(ctx *UpdateECPackageContext, file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []UploadNodeInfo) ([]string, error) {
func ecWrite(file io.ReadCloser, fileSize int64, packetSize int64, ecK int, ecN int, nodes []UploadNodeInfo) ([]string, error) {
// TODO 需要参考RepWrite函数的代码逻辑,做好错误处理
//获取文件大小

var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN
//计算每个块的packet数
numPacket := (fileSize + int64(ecK)*ctx.ECPacketSize - 1) / (int64(ecK) * ctx.ECPacketSize)
numPacket := (fileSize + int64(ecK)*packetSize - 1) / (int64(ecK) * packetSize)
//fmt.Println(numPacket)
//创建channel
loadBufs := make([]chan []byte, ecN)
@@ -232,7 +227,7 @@ func ecWrite(ctx *UpdateECPackageContext, file io.ReadCloser, fileSize int64, ec
}
hashs := make([]string, ecN)
//正式开始写入
go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), ctx.ECPacketSize) //从本地文件系统加载数据
go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), packetSize) //从本地文件系统加载数据
go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket)

var wg sync.WaitGroup


+ 1
- 1
pkgs/cmd/create_rep_package.go View File

@@ -92,7 +92,7 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage
defer mutex.Unlock()

createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy)))
models.NewTypedRedundancyInfo(t.redundancy)))
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}


+ 7
- 12
pkgs/cmd/download_package.go View File

@@ -8,7 +8,6 @@ import (

"gitlink.org.cn/cloudream/common/models"
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
@@ -22,8 +21,7 @@ type DownloadPackage struct {
}

type DownloadPackageContext struct {
Distlock *distsvc.Service
ECPacketSize int64
Distlock *distsvc.Service
}

func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage {
@@ -48,7 +46,7 @@ func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error {
}

var objIter iterator.DownloadingObjectIterator
if getPkgResp.Redundancy.Type == models.RedundancyRep {
if getPkgResp.Redundancy.IsRepInfo() {
objIter, err = t.downloadRep(ctx)
} else {
objIter, err = t.downloadEC(ctx, getPkgResp.Package)
@@ -102,21 +100,18 @@ func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Pack
return nil, fmt.Errorf("getting package object ec data: %w", err)
}

var ecRed models.ECRedundancyInfo
if err := serder.AnyToAny(pkg.Redundancy.Info, &ecRed); err != nil {
var ecInfo models.ECRedundancyInfo
if ecInfo, err = pkg.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}

iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, getECResp.Config, &iterator.ECDownloadContext{
DownloadContext: &iterator.DownloadContext{
Distlock: ctx.Distlock,
},
ECPacketSize: ctx.ECPacketSize,
iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, ecInfo, getECResp.Config, &iterator.DownloadContext{
Distlock: ctx.Distlock,
})

return iter, nil


+ 5
- 6
pkgs/cmd/update_ec_package.go View File

@@ -5,7 +5,6 @@ import (

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/utils/serder"

"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
@@ -32,7 +31,7 @@ func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.Uploadin
}
}

func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackageResult, error) {
func (t *UpdateECPackage) Execute(ctx *UpdatePackageContext) (*UpdateECPackageResult, error) {
defer t.objectIter.Close()

coorCli, err := globals.CoordinatorMQPool.Acquire()
@@ -80,12 +79,12 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage
}
})

var ecRed models.ECRedundancyInfo
if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil {
var ecInfo models.ECRedundancyInfo
if ecInfo, err = getPkgResp.Package.Redundancy.ToECInfo(); err != nil {
return nil, fmt.Errorf("get ec redundancy info: %w", err)
}

getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecInfo.ECName))
if err != nil {
return nil, fmt.Errorf("getting ec: %w", err)
}
@@ -110,7 +109,7 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage
}
defer ipfsMutex.Unlock()

rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config)
rets, err := uploadAndUpdateECPackage(t.packageID, t.objectIter, nodeInfos, ecInfo, getECResp.Config)
if err != nil {
return nil, err
}


+ 1
- 1
pkgs/db/package.go View File

@@ -128,7 +128,7 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID int64) error {
return fmt.Errorf("change package state failed, err: %w", err)
}

if obj.Redundancy.Type == models.RedundancyRep {
if obj.Redundancy.IsRepInfo() {
err = db.ObjectRep().DeleteInPackage(ctx, packageID)
if err != nil {
return fmt.Errorf("delete from object rep failed, err: %w", err)


+ 10
- 12
pkgs/iterator/ec_object_iterator.go View File

@@ -7,9 +7,10 @@ import (
"os"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/models"
stgmodels "gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/ec"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
@@ -19,24 +20,21 @@ type ECObjectIterator struct {
OnClosing func()

objects []model.Object
objectECData []models.ObjectECData
objectECData []stgmodels.ObjectECData
currentIndex int
inited bool

ecInfo models.ECRedundancyInfo
ec model.Ec
downloadCtx *ECDownloadContext
downloadCtx *DownloadContext
cliLocation model.Location
}

type ECDownloadContext struct {
*DownloadContext
ECPacketSize int64
}

func NewECObjectIterator(objects []model.Object, objectECData []models.ObjectECData, ec model.Ec, downloadCtx *ECDownloadContext) *ECObjectIterator {
func NewECObjectIterator(objects []model.Object, objectECData []stgmodels.ObjectECData, ecInfo models.ECRedundancyInfo, ec model.Ec, downloadCtx *DownloadContext) *ECObjectIterator {
return &ECObjectIterator{
objects: objects,
objectECData: objectECData,
ecInfo: ecInfo,
ec: ec,
downloadCtx: downloadCtx,
}
@@ -146,7 +144,7 @@ func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downlo
func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) {
// TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构
//wg := sync.WaitGroup{}
numPacket := (fileSize + int64(ecK)*iter.downloadCtx.ECPacketSize - 1) / (int64(ecK) * iter.downloadCtx.ECPacketSize)
numPacket := (fileSize + int64(ecK)*iter.ecInfo.PacketSize - 1) / (int64(ecK) * iter.ecInfo.PacketSize)
getBufs := make([]chan []byte, ecN)
decodeBufs := make([]chan []byte, ecK)
for i := 0; i < ecN; i++ {
@@ -159,10 +157,10 @@ func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int,
i := idx
go func() {
// TODO 处理错误
file, _ := downloadFile(iter.downloadCtx.DownloadContext, nodeIDs[i], nodeIPs[i], hashs[i])
file, _ := downloadFile(iter.downloadCtx, nodeIDs[i], nodeIPs[i], hashs[i])

for p := int64(0); p < numPacket; p++ {
buf := make([]byte, iter.downloadCtx.ECPacketSize)
buf := make([]byte, iter.ecInfo.PacketSize)
// TODO 处理错误
io.ReadFull(file, buf)
getBufs[blockIDs[i]] <- buf


Loading…
Cancel
Save