|
- package cmd
-
- import (
- "fmt"
- "io"
- "math/rand"
- "sync"
-
- "github.com/samber/lo"
-
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
-
- myio "gitlink.org.cn/cloudream/common/utils/io"
- stgglb "gitlink.org.cn/cloudream/storage/common/globals"
- "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
- "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
- "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
- "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
- coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
- )
-
- type CreateECPackage struct {
- userID int64
- bucketID int64
- name string
- objectIter iterator.UploadingObjectIterator
- redundancy cdssdk.ECRedundancyInfo
- nodeAffinity *int64
- }
-
- type CreateECPackageResult struct {
- PackageID int64
- ObjectResults []ECObjectUploadResult
- }
-
- type ECObjectUploadResult struct {
- Info *iterator.IterUploadingObject
- Error error
- ObjectID int64
- }
-
- func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy cdssdk.ECRedundancyInfo, nodeAffinity *int64) *CreateECPackage {
- return &CreateECPackage{
- userID: userID,
- bucketID: bucketID,
- name: name,
- objectIter: objIter,
- redundancy: redundancy,
- nodeAffinity: nodeAffinity,
- }
- }
-
- func (t *CreateECPackage) Execute(ctx *UpdatePackageContext) (*CreateECPackageResult, error) {
- defer t.objectIter.Close()
-
- coorCli, err := stgglb.CoordinatorMQPool.Acquire()
- if err != nil {
- return nil, fmt.Errorf("new coordinator client: %w", err)
- }
-
- mutex, err := reqbuilder.NewBuilder().
- Metadata().
- // 用于判断用户是否有桶的权限
- UserBucket().ReadOne(t.userID, t.bucketID).
- // 用于查询可用的上传节点
- Node().ReadAny().
- // 用于创建包信息
- Package().CreateOne(t.bucketID, t.name).
- // 用于创建包中的文件的信息
- Object().CreateAny().
- // 用于设置EC配置
- ObjectBlock().CreateAny().
- // 用于创建Cache记录
- Cache().CreateAny().
- MutexLock(ctx.Distlock)
- if err != nil {
- return nil, fmt.Errorf("acquire locks failed, err: %w", err)
- }
- defer mutex.Unlock()
-
- createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
- cdssdk.NewTypedRedundancyInfo(t.redundancy)))
- if err != nil {
- return nil, fmt.Errorf("creating package: %w", err)
- }
-
- getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
- if err != nil {
- return nil, fmt.Errorf("getting user nodes: %w", err)
- }
-
- findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(stgglb.Local.ExternalIP))
- if err != nil {
- return nil, fmt.Errorf("finding client location: %w", err)
- }
-
- uploadNodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
- return UploadNodeInfo{
- Node: node,
- IsSameLocation: node.LocationID == findCliLocResp.Location.LocationID,
- }
- })
-
- getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(t.redundancy.ECName))
- if err != nil {
- return nil, fmt.Errorf("getting ec: %w", err)
- }
-
- // 给上传节点的IPFS加锁
- ipfsReqBlder := reqbuilder.NewBuilder()
- // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
- if stgglb.Local.NodeID != nil {
- ipfsReqBlder.IPFS().CreateAnyRep(*stgglb.Local.NodeID)
- }
- for _, node := range uploadNodeInfos {
- if stgglb.Local.NodeID != nil && node.Node.NodeID == *stgglb.Local.NodeID {
- continue
- }
-
- ipfsReqBlder.IPFS().CreateAnyRep(node.Node.NodeID)
- }
- // 防止上传的副本被清除
- ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock)
- if err != nil {
- return nil, fmt.Errorf("acquire locks failed, err: %w", err)
- }
- defer ipfsMutex.Unlock()
-
- // TODO 需要支持设置节点亲和性
- rets, err := uploadAndUpdateECPackage(createPkgResp.PackageID, t.objectIter, uploadNodeInfos, t.redundancy, getECResp.Config)
- if err != nil {
- return nil, err
- }
-
- return &CreateECPackageResult{
- PackageID: createPkgResp.PackageID,
- ObjectResults: rets,
- }, nil
- }
-
- func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec) ([]ECObjectUploadResult, error) {
- coorCli, err := stgglb.CoordinatorMQPool.Acquire()
- if err != nil {
- return nil, fmt.Errorf("new coordinator client: %w", err)
- }
-
- var uploadRets []ECObjectUploadResult
- //上传文件夹
- var adds []coormq.AddECObjectInfo
- for {
- objInfo, err := objectIter.MoveNext()
- if err == iterator.ErrNoMoreItem {
- break
- }
- if err != nil {
- return nil, fmt.Errorf("reading object: %w", err)
- }
- err = func() error {
- defer objInfo.File.Close()
-
- fileHashes, uploadedNodeIDs, err := uploadECObject(objInfo, uploadNodes, ecInfo, ec)
- uploadRets = append(uploadRets, ECObjectUploadResult{
- Info: objInfo,
- Error: err,
- })
- if err != nil {
- return fmt.Errorf("uploading object: %w", err)
- }
-
- adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs))
- return nil
- }()
- if err != nil {
- return nil, err
- }
- }
-
- _, err = coorCli.UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil))
- if err != nil {
- return nil, fmt.Errorf("updating package: %w", err)
- }
-
- return uploadRets, nil
- }
-
- // 上传文件
- func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ecMod model.Ec) ([]string, []int64, error) {
- uploadNodes = shuffleNodes(uploadNodes, ecMod.EcN)
-
- rs, err := ec.NewRs(ecMod.EcK, ecMod.EcN, ecInfo.ChunkSize)
- if err != nil {
- return nil, nil, err
- }
-
- outputs := myio.ChunkedSplit(obj.File, ecInfo.ChunkSize, ecMod.EcK, myio.ChunkedSplitOption{
- PaddingZeros: true,
- })
- var readers []io.Reader
- for _, o := range outputs {
- readers = append(readers, o)
- }
- defer func() {
- for _, o := range outputs {
- o.Close()
- }
- }()
-
- encStrs := rs.EncodeAll(readers)
-
- wg := sync.WaitGroup{}
-
- nodeIDs := make([]int64, ecMod.EcN)
- fileHashes := make([]string, ecMod.EcN)
- anyErrs := make([]error, ecMod.EcN)
-
- for i := range encStrs {
- idx := i
- wg.Add(1)
- nodeIDs[idx] = uploadNodes[idx].Node.NodeID
- go func() {
- defer wg.Done()
- fileHashes[idx], anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx])
- }()
- }
-
- wg.Wait()
-
- for i, e := range anyErrs {
- if e != nil {
- return nil, nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e)
- }
- }
-
- return fileHashes, nodeIDs, nil
- }
-
- func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo {
- for i := len(uploadNodes); i < extendTo; i++ {
- uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))])
- }
-
- // 随机排列上传节点
- rand.Shuffle(len(uploadNodes), func(i, j int) {
- uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i]
- })
-
- return uploadNodes
- }
|